Federated Learning Demo in Python (Part 4): Working with Mobile Devices

Through the first 3 parts of our federated learning (FL) demo project, we’ve set up a system wherein machine learning (ML) models is trained using FL. Put simply, a generic model is created at the server. The model is then sent to the clients for training, and then sent back to the server.

Check out the previous 3 parts in the project to get caught up:

In part 3, we allowed the user to interact with the server socket app through the terminal.

In this tutorial, which is the last part of the project, the client app will be made available in mobile devices, which allows federated learning to make use of the massive data available within them. Generally speaking, federated learning makes a lot of sense for mobile machine learning applications, as it allows for both model personalization and enhanced data security (vs cloud-based training)

We’ll do this by building a GUI using Kivy for the client app. Once created in Kivy, the GUI app will then be exported to mobile and/or edge devices (Android, iOS, and Raspberry Pi). Similarly, a GUI is built for the server.

The code for this project is available at the Federated Learning GitHub project under the TutorialProject directory. The code for this specific tutorial is available under the TutorialProject/Part4 directory.

Here are the sections covered in this tutorial:

  • Installing Kivy
  • Building the GUI for the server app
  • Handling the actions for the server GUI app
  • Listening for connections in a thread
  • Complete server code
  • Building the GUI for the client app
  • Handling the actions for the client GUI app
  • Send and receive data in a thread
  • Complete client code
  • Building Android apps

Let’s get started.

Installing Kivy

Kivy is a Python framework for building cross-platform GUI applications. Kivy can be installed using the pip installer.

For Windows, use the following CMD command:

For Linux and Mac, replace pip by pip3:

Once installed, make sure it works well by running the code below, which creates a GUI with a button. You may need to install some additional libraries, like SDL, in order to build the Kivy application successfully.

If everything is working properly, then we can proceed into the next section to build the GUI of the server application.

Building the GUI for the Server App

To create a Kivy app, the first step is to create a new class that extends the kivy.app.App class. This class has a method named build() in which the GUI is created.

The code below creates the Kivy widget tree for the server app into a new class named ServerApp. The app has the following widgets, in order:

  • Button: Create a socket
  • TextInput: IPv4 address of the server
  • TextInput: Port number of the server
  • Button: Bind the socket to the IPv4 address and port number entered in the TextInput widgets
  • Button: Listen and accept connections
  • Button: Close the socket
  • Label: Prints informational messages about the server

All of these widgets are placed inside a BoxLayout, which is then returned by the build() method.

import kivy.app
import kivy.uix.button
import kivy.uix.label
import kivy.uix.textinput
import kivy.uix.boxlayout

class ServerApp(kivy.app.App):

    def build(self):
        self.create_socket_btn = kivy.uix.button.Button(text="Create Socket", disabled=False)

        self.server_ip = kivy.uix.textinput.TextInput(hint_text="IPv4 Address", text="localhost")
        self.server_port = kivy.uix.textinput.TextInput(hint_text="Port Number", text="10000")

        self.server_socket_box_layout = kivy.uix.boxlayout.BoxLayout(orientation="horizontal")
        self.server_socket_box_layout.add_widget(self.server_ip)
        self.server_socket_box_layout.add_widget(self.server_port)

        self.bind_btn = kivy.uix.button.Button(text="Bind Socket", disabled=True)

        self.listen_btn = kivy.uix.button.Button(text="Listen to Connections", disabled=True)

        self.close_socket_btn = kivy.uix.button.Button(text="Close Socket", disabled=True)

        self.label = kivy.uix.label.Label(text="Socket Status")

        self.box_layout = kivy.uix.boxlayout.BoxLayout(orientation="vertical")

        self.box_layout.add_widget(self.create_socket_btn)
        self.box_layout.add_widget(self.server_socket_box_layout)
        self.box_layout.add_widget(self.bind_btn)
        self.box_layout.add_widget(self.listen_btn)
        self.box_layout.add_widget(self.close_socket_btn)
        self.box_layout.add_widget(self.label)

        return self.box_layout

To run the Kivy app, create an instance of the ServerApp class, and then call the run() method. The title of the window is set to Server App.

The next figure shows the results after running the app. Except for the button that creates the socket, all the buttons are disabled when the app starts. A button will be enabled only when needed.

Handling the Actions for the Server GUI App

Using the bind() method of the Kivy widgets, a callback method is called when an action occurs. The next code binds a method to be called to each button when pressed.

For the create_socket_btn button, its callback method is shown below. It creates the socket and prints a message on the Label widget, indicating that the socket has been created.

After the socket is created, the button responsible for creating the socket, create_socket_btn, is disabled, as there is no need for it anymore.

After a socket is created, the user might bind it to an address or close it. This is why the buttons responsible for binding (bind_btn) and closing (close_socket_btn) the socket are enabled.

The callback method of the close_socket_btn button is provided below. Once the socket is closed, all the buttons are disabled except for the create_socket_btn button that creates a socket.

For the bind_btn button, here is its callback method. It calls the bind() method after fetching the IPv4 address and the port number from the TextInput widgets. After that, the bind_btn button is disabled and the listen_btn button is enabled to allow the user to listen to and accept connections.

The callback method of the listen_btn is listed below. After calling the listen() method, an instance of a class named ListenThread is created and then started. This thread works in the background to listen to and accept connections.

At this time, the GUI widgets of the server app have been placed and their actions have been handled. The code of the ServerApp is provided below. The next section discusses the implementation of the ListenThread class.

import kivy.app
import kivy.uix.button
import kivy.uix.label
import kivy.uix.textinput
import kivy.uix.boxlayout

class ServerApp(kivy.app.App):
    
    def __init__(self):
        super().__init__()

    def create_socket(self, *args):
        self.soc = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        self.label.text = "Socket Created"

        self.create_socket_btn.disabled = True
        self.bind_btn.disabled = False
        self.close_socket_btn.disabled = False

    def bind_socket(self, *args):
        ipv4_address = self.server_ip.text
        port_number = self.server_port.text
        self.soc.bind((ipv4_address, int(port_number)))
        self.label.text = "Socket Bound to IPv4 & Port Number"

        self.bind_btn.disabled = True
        self.listen_btn.disabled = False

    def listen_accept(self, *args):
        self.soc.listen(1)
        self.label.text = "Socket is Listening for Connections"

        self.listen_btn.disabled = True

        self.listenThread = ListenThread(kivy_app=self)
        self.listenThread.start()

    def close_socket(self, *args):
        self.soc.close()
        self.label.text = "Socket Closed"

        self.create_socket_btn.disabled = False
        self.bind_btn.disabled = True
        self.listen_btn.disabled = True
        self.close_socket_btn.disabled = True

    def build(self):
        self.create_socket_btn = kivy.uix.button.Button(text="Create Socket", disabled=False)
        self.create_socket_btn.bind(on_press=self.create_socket)

        self.server_ip = kivy.uix.textinput.TextInput(hint_text="IPv4 Address", text="localhost")
        self.server_port = kivy.uix.textinput.TextInput(hint_text="Port Number", text="10000")

        self.server_socket_box_layout = kivy.uix.boxlayout.BoxLayout(orientation="horizontal")
        self.server_socket_box_layout.add_widget(self.server_ip)
        self.server_socket_box_layout.add_widget(self.server_port)

        self.bind_btn = kivy.uix.button.Button(text="Bind Socket", disabled=True)
        self.bind_btn.bind(on_press=self.bind_socket)

        self.listen_btn = kivy.uix.button.Button(text="Listen to Connections", disabled=True)
        self.listen_btn.bind(on_press=self.listen_accept)

        self.close_socket_btn = kivy.uix.button.Button(text="Close Socket", disabled=True)
        self.close_socket_btn.bind(on_press=self.close_socket)

        self.label = kivy.uix.label.Label(text="Socket Status")

        self.box_layout = kivy.uix.boxlayout.BoxLayout(orientation="vertical")

        self.box_layout.add_widget(self.create_socket_btn)
        self.box_layout.add_widget(self.server_socket_box_layout)
        self.box_layout.add_widget(self.bind_btn)
        self.box_layout.add_widget(self.listen_btn)
        self.box_layout.add_widget(self.close_socket_btn)
        self.box_layout.add_widget(self.label)

        return self.box_layout

serverApp = ServerApp()
serverApp.title="Server App"
serverApp.run()

Listening for Connections in a Thread

For long-running operations such as listening and accepting incoming connections to a server, it’s a bad idea to do it in the main thread of the application. This freezes the app, because once the server starts to listen for connections, it will never return. A good alternative is to create a thread that works in the background that listens for connections. The threading module in Python can be used for this purpose.

Similar to the threads created in Part 2 and Part 3, we create a new thread as a class that extends the threading.Thread class. The run() method in the created class is executed once the thread is started.

The following code creates a new class named ListenThread that extends the threading.Thread class. Its constructor accepts a single parameter named kivy_app, which is the reference to the Kivy app instance.

The run() method has an infinite while loop that waits for incoming connections to accept. Once a connection is accepted, a new instance of the SocketThread class (thread) is created and started. The SocketThread class is discussed in Parts 2 and 3.

import threading

class ListenThread(threading.Thread):

    def __init__(self, kivy_app):
        threading.Thread.__init__(self)
        self.kivy_app = kivy_app

    def run(self):
        while True:
            try:
                connection, client_info = self.kivy_app.soc.accept()
                self.kivy_app.label.text = "New Connection from {client_info}".format(client_info=client_info)
                socket_thread = SocketThread(connection=connection,
                                             client_info=client_info, 
                                             kivy_app=self.kivy_app,
                                             buffer_size=1024,
                                             recv_timeout=10)
                socket_thread.start()
            except BaseException as e:
                self.kivy_app.soc.close()
                print(e)
                self.kivy_app.label.text = "Socket is No Longer Accepting Connections"
                self.kivy_app.create_socket_btn.disabled = False
                self.kivy_app.close_socket_btn.disabled = True
                break

The implementation of the SocketThread class is shown below. This class is responsible for receiving the clients’ data and also sending data to the clients. The implementation of the nn and gann modules is available at the Federated Learning GitHub project under the TutorialProject/Part4 directory.

import socket
import pickle
import time
import numpy

import nn
import gann

class SocketThread(threading.Thread):

    def __init__(self, connection, client_info, kivy_app, buffer_size=1024, recv_timeout=5):
        threading.Thread.__init__(self)
        self.connection = connection
        self.client_info = client_info
        self.buffer_size = buffer_size
        self.recv_timeout = recv_timeout
        self.kivy_app = kivy_app

    def recv(self):
        received_data = b""
        while True:
            try:
                
                data = self.connection.recv(self.buffer_size)
                received_data += data

                if data == b'': # Nothing received from the client.
                    received_data = b""
                    # If still nothing received for a number of seconds specified by the recv_timeout attribute, return with status 0 to close the connection.
                    if (time.time() - self.recv_start_time) > self.recv_timeout:
                        return None, 0 # 0 means the connection is no longer active and it should be closed.

                elif str(data)[-2] == '.':
                    print("All data ({data_len} bytes) Received from {client_info}.".format(client_info=self.client_info, data_len=len(received_data)))
                    self.kivy_app.label.text = "All data ({data_len} bytes) Received from {client_info}.".format(client_info=self.client_info, data_len=len(received_data))

                    if len(received_data) > 0:
                        try:
                            # Decoding the data (bytes).
                            received_data = pickle.loads(received_data)
                            # Returning the decoded data.
                            return received_data, 1

                        except BaseException as e:
                            print("Error Decoding the Client's Data: {msg}.n".format(msg=e))
                            self.kivy_app.label.text = "Error Decoding the Client's Data"
                            return None, 0

                else:
                    # In case data are received from the client, update the recv_start_time to the current time to reset the timeout counter.
                    self.recv_start_time = time.time()

            except BaseException as e:
                print("Error Receiving Data from the Client: {msg}.n".format(msg=e))
                self.kivy_app.label.text = "Error Receiving Data from the Client"
                return None, 0

    def model_averaging(self, model, other_model):
        model_weights = nn.layers_weights(last_layer=model, initial=False)
        other_model_weights = nn.layers_weights(last_layer=other_model, initial=False)
        
        new_weights = numpy.array(model_weights + other_model_weights)/2

        nn.update_layers_trained_weights(last_layer=model, final_weights=new_weights)

    def reply(self, received_data):
        global GANN_instance, data_inputs, data_outputs, model
        if (type(received_data) is dict):
            if (("data" in received_data.keys()) and ("subject" in received_data.keys())):
                subject = received_data["subject"]
                print("Client's Message Subject is {subject}.".format(subject=subject))
                self.kivy_app.label.text = "Client's Message Subject is {subject}".format(subject=subject)

                print("Replying to the Client.")
                self.kivy_app.label.text = "Replying to the Client"
                if subject == "echo":
                    if model is None:
                        data = {"subject": "model", "data": GANN_instance}
                    else:
                        predictions = nn.predict(last_layer=model, data_inputs=data_inputs)
                        error = numpy.sum(numpy.abs(predictions - data_outputs))
                        # In case a client sent a model to the server despite that the model error is 0.0. In this case, no need to make changes in the model.
                        if error == 0:
                            data = {"subject": "done", "data": None}
                        else:
                            data = {"subject": "model", "data": GANN_instance}

                    try:
                        response = pickle.dumps(data)
                    except BaseException as e:
                        print("Error Encoding the Message: {msg}.n".format(msg=e))
                        self.kivy_app.label.text = "Error Encoding the Message"
                elif subject == "model":
                    try:
                        GANN_instance = received_data["data"]
                        best_model_idx = received_data["best_solution_idx"]

                        best_model = GANN_instance.population_networks[best_model_idx]
                        if model is None:
                            model = best_model
                        else:
                            predictions = nn.predict(last_layer=model, data_inputs=data_inputs)
    
                            error = numpy.sum(numpy.abs(predictions - data_outputs))
    
                            # In case a client sent a model to the server despite that the model error is 0.0. In this case, no need to make changes in the model.
                            if error == 0:
                                data = {"subject": "done", "data": None}
                                response = pickle.dumps(data)
                                return

                            self.model_averaging(model, best_model)

                        # print(best_model.trained_weights)
                        # print(model.trained_weights)

                        predictions = nn.predict(last_layer=model, data_inputs=data_inputs)
                        print("Model Predictions: {predictions}".format(predictions=predictions))

                        error = numpy.sum(numpy.abs(predictions - data_outputs))
                        print("Prediction Error = {error}".format(error=error))
                        self.kivy_app.label.text = "Prediction Error = {error}".format(error=error)

                        if error != 0:
                            data = {"subject": "model", "data": GANN_instance}
                            response = pickle.dumps(data)
                        else:
                            data = {"subject": "done", "data": None}
                            response = pickle.dumps(data)

                    except BaseException as e:
                        print("Error Decoding the Client's Data: {msg}.n".format(msg=e))
                        self.kivy_app.label.text = "Error Decoding the Client's Data"
                else:
                    response = pickle.dumps("Response from the Server")
                            
                try:
                    self.connection.sendall(response)
                except BaseException as e:
                    print("Error Sending Data to the Client: {msg}.n".format(msg=e))
                    self.kivy_app.label.text = "Error Sending Data to the Client: {msg}".format(msg=e)

            else:
                print("The received dictionary from the client must have the 'subject' and 'data' keys available. The existing keys are {d_keys}.".format(d_keys=received_data.keys()))
                self.kivy_app.label.text = "Error Parsing Received Dictionary"
        else:
            print("A dictionary is expected to be received from the client but {d_type} received.".format(d_type=type(received_data)))
            self.kivy_app.label.text = "A dictionary is expected but {d_type} received.".format(d_type=type(received_data))

    def run(self):
        print("Running a Thread for the Connection with {client_info}.".format(client_info=self.client_info))
        self.kivy_app.label.text = "Running a Thread for the Connection with {client_info}.".format(client_info=self.client_info)

        # This while loop allows the server to wait for the client to send data more than once within the same connection.
        while True:
            self.recv_start_time = time.time()
            time_struct = time.gmtime()
            date_time = "Waiting to Receive Data Starting from {day}/{month}/{year} {hour}:{minute}:{second} GMT".format(year=time_struct.tm_year, month=time_struct.tm_mon, day=time_struct.tm_mday, hour=time_struct.tm_hour, minute=time_struct.tm_min, second=time_struct.tm_sec)
            print(date_time)
            received_data, status = self.recv()
            if status == 0:
                self.connection.close()
                self.kivy_app.label.text = "Connection Closed with {client_info}".format(client_info=self.client_info)
                print("Connection Closed with {client_info} either due to inactivity for {recv_timeout} seconds or due to an error.".format(client_info=self.client_info, recv_timeout=self.recv_timeout), end="nn")
                break

            # print(received_data)
            self.reply(received_data)

Complete Server Code

The complete code for the server is listed below. The code is available in the TutorialProject/Part4/server.py file under the Federated Learning GitHub project.

import socket
import pickle
import threading
import time
import numpy

import nn
import gann

import kivy.app
import kivy.uix.button
import kivy.uix.label
import kivy.uix.textinput
import kivy.uix.boxlayout

class ServerApp(kivy.app.App):
    
    def __init__(self):
        super().__init__()

    def create_socket(self, *args):
        self.soc = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        self.label.text = "Socket Created"

        self.create_socket_btn.disabled = True
        self.bind_btn.disabled = False
        self.close_socket_btn.disabled = False

    def bind_socket(self, *args):
        ipv4_address = self.server_ip.text
        port_number = self.server_port.text
        self.soc.bind((ipv4_address, int(port_number)))
        self.label.text = "Socket Bound to IPv4 & Port Number"

        self.bind_btn.disabled = True
        self.listen_btn.disabled = False

    def listen_accept(self, *args):
        self.soc.listen(1)
        self.label.text = "Socket is Listening for Connections"

        self.listen_btn.disabled = True

        self.listenThread = ListenThread(kivy_app=self)
        self.listenThread.start()

    def close_socket(self, *args):
        self.soc.close()
        self.label.text = "Socket Closed"

        self.create_socket_btn.disabled = False
        self.bind_btn.disabled = True
        self.listen_btn.disabled = True
        self.close_socket_btn.disabled = True

    def build(self):
        self.create_socket_btn = kivy.uix.button.Button(text="Create Socket", disabled=False)
        self.create_socket_btn.bind(on_press=self.create_socket)

        self.server_ip = kivy.uix.textinput.TextInput(hint_text="IPv4 Address", text="localhost")
        self.server_port = kivy.uix.textinput.TextInput(hint_text="Port Number", text="10000")

        self.server_socket_box_layout = kivy.uix.boxlayout.BoxLayout(orientation="horizontal")
        self.server_socket_box_layout.add_widget(self.server_ip)
        self.server_socket_box_layout.add_widget(self.server_port)

        self.bind_btn = kivy.uix.button.Button(text="Bind Socket", disabled=True)
        self.bind_btn.bind(on_press=self.bind_socket)

        self.listen_btn = kivy.uix.button.Button(text="Listen to Connections", disabled=True)
        self.listen_btn.bind(on_press=self.listen_accept)

        self.close_socket_btn = kivy.uix.button.Button(text="Close Socket", disabled=True)
        self.close_socket_btn.bind(on_press=self.close_socket)

        self.label = kivy.uix.label.Label(text="Socket Status")

        self.box_layout = kivy.uix.boxlayout.BoxLayout(orientation="vertical")

        self.box_layout.add_widget(self.create_socket_btn)
        self.box_layout.add_widget(self.server_socket_box_layout)
        self.box_layout.add_widget(self.bind_btn)
        self.box_layout.add_widget(self.listen_btn)
        self.box_layout.add_widget(self.close_socket_btn)
        self.box_layout.add_widget(self.label)

        return self.box_layout

model = None

# Preparing the NumPy array of the inputs.
data_inputs = numpy.array([[1, 1],
                           [1, 0],
                           [0, 1],
                           [0, 0]])

# Preparing the NumPy array of the outputs.
data_outputs = numpy.array([0, 
                            1, 
                            1, 
                            0])

num_classes = 2
num_inputs = 2

num_solutions = 6
GANN_instance = gann.GANN(num_solutions=num_solutions,
                                num_neurons_input=num_inputs,
                                num_neurons_hidden_layers=[2],
                                num_neurons_output=num_classes,
                                hidden_activations=["relu"],
                                output_activation="softmax")

class SocketThread(threading.Thread):

    def __init__(self, connection, client_info, kivy_app, buffer_size=1024, recv_timeout=5):
        threading.Thread.__init__(self)
        self.connection = connection
        self.client_info = client_info
        self.buffer_size = buffer_size
        self.recv_timeout = recv_timeout
        self.kivy_app = kivy_app

    def recv(self):
        received_data = b""
        while True:
            try:
                
                data = self.connection.recv(self.buffer_size)
                received_data += data

                if data == b'': # Nothing received from the client.
                    received_data = b""
                    # If still nothing received for a number of seconds specified by the recv_timeout attribute, return with status 0 to close the connection.
                    if (time.time() - self.recv_start_time) > self.recv_timeout:
                        return None, 0 # 0 means the connection is no longer active and it should be closed.

                elif str(data)[-2] == '.':
                    print("All data ({data_len} bytes) Received from {client_info}.".format(client_info=self.client_info, data_len=len(received_data)))
                    self.kivy_app.label.text = "All data ({data_len} bytes) Received from {client_info}.".format(client_info=self.client_info, data_len=len(received_data))

                    if len(received_data) > 0:
                        try:
                            # Decoding the data (bytes).
                            received_data = pickle.loads(received_data)
                            # Returning the decoded data.
                            return received_data, 1

                        except BaseException as e:
                            print("Error Decoding the Client's Data: {msg}.n".format(msg=e))
                            self.kivy_app.label.text = "Error Decoding the Client's Data"
                            return None, 0

                else:
                    # In case data are received from the client, update the recv_start_time to the current time to reset the timeout counter.
                    self.recv_start_time = time.time()

            except BaseException as e:
                print("Error Receiving Data from the Client: {msg}.n".format(msg=e))
                self.kivy_app.label.text = "Error Receiving Data from the Client"
                return None, 0

    def model_averaging(self, model, other_model):
        model_weights = nn.layers_weights(last_layer=model, initial=False)
        other_model_weights = nn.layers_weights(last_layer=other_model, initial=False)
        
        new_weights = numpy.array(model_weights + other_model_weights)/2

        nn.update_layers_trained_weights(last_layer=model, final_weights=new_weights)

    def reply(self, received_data):
        global GANN_instance, data_inputs, data_outputs, model
        if (type(received_data) is dict):
            if (("data" in received_data.keys()) and ("subject" in received_data.keys())):
                subject = received_data["subject"]
                print("Client's Message Subject is {subject}.".format(subject=subject))
                self.kivy_app.label.text = "Client's Message Subject is {subject}".format(subject=subject)

                print("Replying to the Client.")
                self.kivy_app.label.text = "Replying to the Client"
                if subject == "echo":
                    if model is None:
                        data = {"subject": "model", "data": GANN_instance}
                    else:
                        predictions = nn.predict(last_layer=model, data_inputs=data_inputs)
                        error = numpy.sum(numpy.abs(predictions - data_outputs))
                        # In case a client sent a model to the server despite that the model error is 0.0. In this case, no need to make changes in the model.
                        if error == 0:
                            data = {"subject": "done", "data": None}
                        else:
                            data = {"subject": "model", "data": GANN_instance}

                    try:
                        response = pickle.dumps(data)
                    except BaseException as e:
                        print("Error Encoding the Message: {msg}.n".format(msg=e))
                        self.kivy_app.label.text = "Error Encoding the Message"
                elif subject == "model":
                    try:
                        GANN_instance = received_data["data"]
                        best_model_idx = received_data["best_solution_idx"]

                        best_model = GANN_instance.population_networks[best_model_idx]
                        if model is None:
                            model = best_model
                        else:
                            predictions = nn.predict(last_layer=model, data_inputs=data_inputs)
    
                            error = numpy.sum(numpy.abs(predictions - data_outputs))
    
                            # In case a client sent a model to the server despite that the model error is 0.0. In this case, no need to make changes in the model.
                            if error == 0:
                                data = {"subject": "done", "data": None}
                                response = pickle.dumps(data)
                                return

                            self.model_averaging(model, best_model)

                        # print(best_model.trained_weights)
                        # print(model.trained_weights)

                        predictions = nn.predict(last_layer=model, data_inputs=data_inputs)
                        print("Model Predictions: {predictions}".format(predictions=predictions))

                        error = numpy.sum(numpy.abs(predictions - data_outputs))
                        print("Prediction Error = {error}".format(error=error))
                        self.kivy_app.label.text = "Prediction Error = {error}".format(error=error)

                        if error != 0:
                            data = {"subject": "model", "data": GANN_instance}
                            response = pickle.dumps(data)
                        else:
                            data = {"subject": "done", "data": None}
                            response = pickle.dumps(data)

                    except BaseException as e:
                        print("Error Decoding the Client's Data: {msg}.n".format(msg=e))
                        self.kivy_app.label.text = "Error Decoding the Client's Data"
                else:
                    response = pickle.dumps("Response from the Server")
                            
                try:
                    self.connection.sendall(response)
                except BaseException as e:
                    print("Error Sending Data to the Client: {msg}.n".format(msg=e))
                    self.kivy_app.label.text = "Error Sending Data to the Client: {msg}".format(msg=e)

            else:
                print("The received dictionary from the client must have the 'subject' and 'data' keys available. The existing keys are {d_keys}.".format(d_keys=received_data.keys()))
                self.kivy_app.label.text = "Error Parsing Received Dictionary"
        else:
            print("A dictionary is expected to be received from the client but {d_type} received.".format(d_type=type(received_data)))
            self.kivy_app.label.text = "A dictionary is expected but {d_type} received.".format(d_type=type(received_data))

    def run(self):
        print("Running a Thread for the Connection with {client_info}.".format(client_info=self.client_info))
        self.kivy_app.label.text = "Running a Thread for the Connection with {client_info}.".format(client_info=self.client_info)

        # This while loop allows the server to wait for the client to send data more than once within the same connection.
        while True:
            self.recv_start_time = time.time()
            time_struct = time.gmtime()
            date_time = "Waiting to Receive Data Starting from {day}/{month}/{year} {hour}:{minute}:{second} GMT".format(year=time_struct.tm_year, month=time_struct.tm_mon, day=time_struct.tm_mday, hour=time_struct.tm_hour, minute=time_struct.tm_min, second=time_struct.tm_sec)
            print(date_time)
            received_data, status = self.recv()
            if status == 0:
                self.connection.close()
                self.kivy_app.label.text = "Connection Closed with {client_info}".format(client_info=self.client_info)
                print("Connection Closed with {client_info} either due to inactivity for {recv_timeout} seconds or due to an error.".format(client_info=self.client_info, recv_timeout=self.recv_timeout), end="nn")
                break

            # print(received_data)
            self.reply(received_data)

class ListenThread(threading.Thread):

    def __init__(self, kivy_app):
        threading.Thread.__init__(self)
        self.kivy_app = kivy_app

    def run(self):
        while True:
            try:
                connection, client_info = self.kivy_app.soc.accept()
                self.kivy_app.label.text = "New Connection from {client_info}".format(client_info=client_info)
                socket_thread = SocketThread(connection=connection,
                                             client_info=client_info, 
                                             kivy_app=self.kivy_app,
                                             buffer_size=1024,
                                             recv_timeout=10)
                socket_thread.start()
            except BaseException as e:
                self.kivy_app.soc.close()
                print(e)
                self.kivy_app.label.text = "Socket is No Longer Accepting Connections"
                self.kivy_app.create_socket_btn.disabled = False
                self.kivy_app.close_socket_btn.disabled = True
                break

serverApp = ServerApp()
serverApp.title="Server App"
serverApp.run()

Building the GUI for the Client App

Similar to the server app, we’ll build a GUI for the client app using Kivy. In order, the widgets for the client app are as follows:

  • Button: Create a socket.
  • TextInput: IPv4 address of the server.
  • TextInput: Port number of the server.
  • Button: Connect to the server through its socket specified by the IPv4 address and port number entered in the TextInput widgets.
  • Button: Receive a model to be trained by the client’s local data.
  • Button: Close the socket.
  • Label: Prints informational messages about the client.

A BoxLayout groups all of these widgets together. All the buttons are disabled, except for the button that creates the socket. The buttons are only enabled when needed.

The code below creates a class named ClientApp that extends the kivy.app.App and overrides its build() method to place the widget tree.

class ClientApp(kivy.app.App):

    def build(self):
        self.create_socket_btn = kivy.uix.button.Button(text="Create Socket")

        self.server_ip = kivy.uix.textinput.TextInput(hint_text="Server IPv4 Address", text="localhost")
        self.server_port = kivy.uix.textinput.TextInput(hint_text="Server Port Number", text="10000")

        self.server_info_boxlayout = kivy.uix.boxlayout.BoxLayout(orientation="horizontal")
        self.server_info_boxlayout.add_widget(self.server_ip)
        self.server_info_boxlayout.add_widget(self.server_port)

        self.connect_btn = kivy.uix.button.Button(text="Connect to Server", disabled=True)

        self.recv_train_model_btn = kivy.uix.button.Button(text="Receive & Train Model", disabled=True)

        self.close_socket_btn = kivy.uix.button.Button(text="Close Socket", disabled=True)

        self.label = kivy.uix.label.Label(text="Socket Status")

        self.box_layout = kivy.uix.boxlayout.BoxLayout(orientation="vertical")
        self.box_layout.add_widget(self.create_socket_btn)
        self.box_layout.add_widget(self.server_info_boxlayout)
        self.box_layout.add_widget(self.connect_btn)
        self.box_layout.add_widget(self.recv_train_model_btn)
        self.box_layout.add_widget(self.close_socket_btn)
        self.box_layout.add_widget(self.label)

        return self.box_layout

The next step is to create an instance of the ClientApp class and run the client app according to the code below.

The next figure shows the window of the client app.

The next section discusses adding and handling the press action to all the buttons in the client app.

Handling the Actions for the Server GUI App

Using the bind() method of the Kivy widgets, callback methods are specified to handle the press action.

For the create_socket_btn button, it creates a socket when pressed, according to the method in the code below. Once a socket is created, the create_socket_btn button is no longer needed, and thus it’s disabled. The buttons connect_btn and close_socket_btn are enabled to allow the user to either connect to a server or close the socket.

The method that handles the press action for the close_socket_btn button is provided below. After the socket is closed, then all buttons are disabled, except for the create_socket_btn to allow the user to create a new socket.

The method given below handles the press action for the connect_btn button. It calls the connect() method of the socket, then disables the connect_btn button because it’s no longer needed. It also enables the recv_train_model_btn button to allow the user to receive the model from the server.

The callback method of the recv_train_model_btn button’s press action is shown below. It disables the recv_train_model_btn button. It also creates an instance of the RecvThread class (thread) and starts it. This class was implemented in Part 2 and Part 3, and it’s responsible for sending to and receiving from the server.

The complete code for the ClientApp class is provided below. The next section discusses the implementation of the RecvThread class.

import kivy.app
import kivy.uix.button
import kivy.uix.label
import kivy.uix.boxlayout
import kivy.uix.textinput

import threading

class ClientApp(kivy.app.App):
    
    def __init__(self):
        super().__init__()

    def create_socket(self, *args):
        self.soc = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        self.label.text = "Socket Created"

        self.create_socket_btn.disabled = True
        self.connect_btn.disabled = False
        self.close_socket_btn.disabled = False

    def connect(self, *args):
        try:
            self.soc.connect((self.server_ip.text, int(self.server_port.text)))
            self.label.text = "Successful Connection to the Server"
    
            self.connect_btn.disabled = True
            self.recv_train_model_btn.disabled = False

        except BaseException as e:
            self.label.text = "Error Connecting to the Server"
            print("Error Connecting to the Server: {msg}".format(msg=e))

            self.connect_btn.disabled = False
            self.recv_train_model_btn.disabled = True

    def recv_train_model(self, *args):
        global GANN_instance

        self.recv_train_model_btn.disabled = True
        recvThread = RecvThread(kivy_app=self, buffer_size=1024, recv_timeout=10)
        recvThread.start()

    def close_socket(self, *args):
        self.soc.close()
        self.label.text = "Socket Closed"

        self.create_socket_btn.disabled = False
        self.connect_btn.disabled = True
        self.recv_train_model_btn.disabled = True
        self.close_socket_btn.disabled = True

    def build(self):
        self.create_socket_btn = kivy.uix.button.Button(text="Create Socket")
        self.create_socket_btn.bind(on_press=self.create_socket)

        self.server_ip = kivy.uix.textinput.TextInput(hint_text="Server IPv4 Address", text="localhost")
        self.server_port = kivy.uix.textinput.TextInput(hint_text="Server Port Number", text="10000")

        self.server_info_boxlayout = kivy.uix.boxlayout.BoxLayout(orientation="horizontal")
        self.server_info_boxlayout.add_widget(self.server_ip)
        self.server_info_boxlayout.add_widget(self.server_port)

        self.connect_btn = kivy.uix.button.Button(text="Connect to Server", disabled=True)
        self.connect_btn.bind(on_press=self.connect)

        self.recv_train_model_btn = kivy.uix.button.Button(text="Receive & Train Model", disabled=True)
        self.recv_train_model_btn.bind(on_press=self.recv_train_model)

        self.close_socket_btn = kivy.uix.button.Button(text="Close Socket", disabled=True)
        self.close_socket_btn.bind(on_press=self.close_socket)

        self.label = kivy.uix.label.Label(text="Socket Status")

        self.box_layout = kivy.uix.boxlayout.BoxLayout(orientation="vertical")
        self.box_layout.add_widget(self.create_socket_btn)
        self.box_layout.add_widget(self.server_info_boxlayout)
        self.box_layout.add_widget(self.connect_btn)
        self.box_layout.add_widget(self.recv_train_model_btn)
        self.box_layout.add_widget(self.close_socket_btn)
        self.box_layout.add_widget(self.label)

        return self.box_layout

clientApp = ClientApp()
clientApp.title = "Client App"
clientApp.run()

Send and Receive Data in a Thread

Using the threading module, a thread is created to send and receive data between the client and the server. The following code creates a new class named RecvThread, which extends the threading.Thread class. Its constructor accepts 3 parameters:

  1. kivy_app: A reference to the Kivy app instance.
  2. buffer_size: The buffer size.
  3. recv_timeout: The timeout, after which the connection will be closed if no data is received from the server.

The implementation of the nn and gann modules is available at the the Federated Learning GitHub project under the TutorialProject/Part4 directory.

import socket
import pickle
import numpy
import threading

import pygad
import nn
import gann

class RecvThread(threading.Thread):

    def __init__(self, kivy_app, buffer_size, recv_timeout):
        threading.Thread.__init__(self)
        self.kivy_app = kivy_app
        self.buffer_size = buffer_size
        self.recv_timeout = recv_timeout

    def recv(self):
        received_data = b""
        while str(received_data)[-2] != '.':
            try:
                self.kivy_app.soc.settimeout(self.recv_timeout)
                received_data += self.kivy_app.soc.recv(self.buffer_size)
            except socket.timeout:
                print("A socket.timeout exception occurred because the server did not send any data for {recv_timeout} seconds.".format(recv_timeout=self.recv_timeout))
                self.kivy_app.label.text = "{recv_timeout} Seconds of Inactivity. socket.timeout Exception Occurred".format(recv_timeout=self.recv_timeout)
                return None, 0
            except BaseException as e:
                return None, 0
                print("Error While Receiving Data from the Server: {msg}.".format(msg=e))
                self.kivy_app.label.text = "Error While Receiving Data from the Server"

        try:
            received_data = pickle.loads(received_data)
        except BaseException as e:
            print("Error Decoding the Client's Data: {msg}.n".format(msg=e))
            self.kivy_app.label.text = "Error Decoding the Client's Data"
            return None, 0
    
        return received_data, 1

    def run(self):
        global GANN_instance

        subject = "echo"
        GANN_instance = None
        best_sol_idx = -1

        while True:
            data = {"subject": subject, "data": GANN_instance, "best_solution_idx": best_sol_idx}
            data_byte = pickle.dumps(data)

            self.kivy_app.label.text = "Sending a Message of Type {subject} to the Server".format(subject=subject)
            try:
                self.kivy_app.soc.sendall(data_byte)
            except BaseException as e:
                self.kivy_app.label.text = "Error Connecting to the Server. The server might has been closed."
                print("Error Connecting to the Server: {msg}".format(msg=e))
                break

            self.kivy_app.label.text = "Receiving Reply from the Server"
            received_data, status = self.recv()
            if status == 0:
                self.kivy_app.label.text = "Nothing Received from the Server"
                break
            else:
                self.kivy_app.label.text = "New Message from the Server"

            subject = received_data["subject"]
            if subject == "model":
                GANN_instance = received_data["data"]
            elif subject == "done":
                self.kivy_app.label.text = "Model is Trained"
                break
            else:
                self.kivy_app.label.text = "Unrecognized Message Type: {subject}".format(subject=subject)
                break

            ga_instance = prepare_GA(GANN_instance)

            ga_instance.run()

            subject = "model"
            best_sol_idx = ga_instance.best_solution()[2]

The run() method has an infinite loop that receives the model from the server, trains it using its local data, and then send the trained model back to the server.

Complete Client Code

The complete code of the client is listed below. The code is available in the TutorialProject/Part4/client1.py and TutorialProject/Part4/client2.py files under the Federated Learning GitHub project.

import socket
import pickle
import numpy

import pygad
import nn
import gann

import kivy.app
import kivy.uix.button
import kivy.uix.label
import kivy.uix.boxlayout
import kivy.uix.textinput

import threading

class ClientApp(kivy.app.App):
    
    def __init__(self):
        super().__init__()

    def create_socket(self, *args):
        self.soc = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        self.label.text = "Socket Created"

        self.create_socket_btn.disabled = True
        self.connect_btn.disabled = False
        self.close_socket_btn.disabled = False

    def connect(self, *args):
        try:
            self.soc.connect((self.server_ip.text, int(self.server_port.text)))
            self.label.text = "Successful Connection to the Server"
    
            self.connect_btn.disabled = True
            self.recv_train_model_btn.disabled = False

        except BaseException as e:
            self.label.text = "Error Connecting to the Server"
            print("Error Connecting to the Server: {msg}".format(msg=e))

            self.connect_btn.disabled = False
            self.recv_train_model_btn.disabled = True

    def recv_train_model(self, *args):
        global GANN_instance

        self.recv_train_model_btn.disabled = True
        recvThread = RecvThread(kivy_app=self, buffer_size=1024, recv_timeout=10)
        recvThread.start()

    def close_socket(self, *args):
        self.soc.close()
        self.label.text = "Socket Closed"

        self.create_socket_btn.disabled = False
        self.connect_btn.disabled = True
        self.recv_train_model_btn.disabled = True
        self.close_socket_btn.disabled = True

    def build(self):
        self.create_socket_btn = kivy.uix.button.Button(text="Create Socket")
        self.create_socket_btn.bind(on_press=self.create_socket)

        self.server_ip = kivy.uix.textinput.TextInput(hint_text="Server IPv4 Address", text="localhost")
        self.server_port = kivy.uix.textinput.TextInput(hint_text="Server Port Number", text="10000")

        self.server_info_boxlayout = kivy.uix.boxlayout.BoxLayout(orientation="horizontal")
        self.server_info_boxlayout.add_widget(self.server_ip)
        self.server_info_boxlayout.add_widget(self.server_port)

        self.connect_btn = kivy.uix.button.Button(text="Connect to Server", disabled=True)
        self.connect_btn.bind(on_press=self.connect)

        self.recv_train_model_btn = kivy.uix.button.Button(text="Receive & Train Model", disabled=True)
        self.recv_train_model_btn.bind(on_press=self.recv_train_model)

        self.close_socket_btn = kivy.uix.button.Button(text="Close Socket", disabled=True)
        self.close_socket_btn.bind(on_press=self.close_socket)

        self.label = kivy.uix.label.Label(text="Socket Status")

        self.box_layout = kivy.uix.boxlayout.BoxLayout(orientation="vertical")
        self.box_layout.add_widget(self.create_socket_btn)
        self.box_layout.add_widget(self.server_info_boxlayout)
        self.box_layout.add_widget(self.connect_btn)
        self.box_layout.add_widget(self.recv_train_model_btn)
        self.box_layout.add_widget(self.close_socket_btn)
        self.box_layout.add_widget(self.label)

        return self.box_layout

def fitness_func(solution, sol_idx):
    global GANN_instance, data_inputs, data_outputs

    predictions = nn.predict(last_layer=GANN_instance.population_networks[sol_idx],
                                           data_inputs=data_inputs)
    correct_predictions = numpy.where(predictions == data_outputs)[0].size
    solution_fitness = (correct_predictions/data_outputs.size)*100

    return solution_fitness

def callback_generation(ga_instance):
    global GANN_instance, last_fitness

    population_matrices = gann.population_as_matrices(population_networks=GANN_instance.population_networks, 
                                                            population_vectors=ga_instance.population)

    GANN_instance.update_population_trained_weights(population_trained_weights=population_matrices)

def prepare_GA(GANN_instance):
    population_vectors = gann.population_as_vectors(population_networks=GANN_instance.population_networks)
    
    initial_population = population_vectors.copy()
    
    num_parents_mating = 4 # Number of solutions to be selected as parents in the mating pool.
    
    num_generations = 500 # Number of generations.
    
    mutation_percent_genes = 5 # Percentage of genes to mutate. This parameter has no action if the parameter mutation_num_genes exists.
    
    parent_selection_type = "sss" # Type of parent selection.
    
    crossover_type = "single_point" # Type of the crossover operator.
    
    mutation_type = "random" # Type of the mutation operator.
    
    keep_parents = 1 # Number of parents to keep in the next population. -1 means keep all parents and 0 means keep nothing.
    
    init_range_low = -2
    init_range_high = 5
    
    ga_instance = pygad.GA(num_generations=num_generations, 
                           num_parents_mating=num_parents_mating, 
                           initial_population=initial_population,
                           fitness_func=fitness_func,
                           mutation_percent_genes=mutation_percent_genes,
                           init_range_low=init_range_low,
                           init_range_high=init_range_high,
                           parent_selection_type=parent_selection_type,
                           crossover_type=crossover_type,
                           mutation_type=mutation_type,
                           keep_parents=keep_parents,
                           callback_generation=callback_generation)

    return ga_instance

# Preparing the NumPy array of the inputs.
data_inputs = numpy.array([[0, 1],
                           [0, 0]])

# Preparing the NumPy array of the outputs.
data_outputs = numpy.array([1, 
                            0])

class RecvThread(threading.Thread):

    def __init__(self, kivy_app, buffer_size, recv_timeout):
        threading.Thread.__init__(self)
        self.kivy_app = kivy_app
        self.buffer_size = buffer_size
        self.recv_timeout = recv_timeout

    def recv(self):
        received_data = b""
        while str(received_data)[-2] != '.':
            try:
                self.kivy_app.soc.settimeout(self.recv_timeout)
                received_data += self.kivy_app.soc.recv(self.buffer_size)
            except socket.timeout:
                print("A socket.timeout exception occurred because the server did not send any data for {recv_timeout} seconds.".format(recv_timeout=self.recv_timeout))
                self.kivy_app.label.text = "{recv_timeout} Seconds of Inactivity. socket.timeout Exception Occurred".format(recv_timeout=self.recv_timeout)
                return None, 0
            except BaseException as e:
                return None, 0
                print("Error While Receiving Data from the Server: {msg}.".format(msg=e))
                self.kivy_app.label.text = "Error While Receiving Data from the Server"

        try:
            received_data = pickle.loads(received_data)
        except BaseException as e:
            print("Error Decoding the Client's Data: {msg}.n".format(msg=e))
            self.kivy_app.label.text = "Error Decoding the Client's Data"
            return None, 0
    
        return received_data, 1

    def run(self):
        global GANN_instance

        subject = "echo"
        GANN_instance = None
        best_sol_idx = -1

        while True:
            data = {"subject": subject, "data": GANN_instance, "best_solution_idx": best_sol_idx}
            data_byte = pickle.dumps(data)

            self.kivy_app.label.text = "Sending a Message of Type {subject} to the Server".format(subject=subject)
            try:
                self.kivy_app.soc.sendall(data_byte)
            except BaseException as e:
                self.kivy_app.label.text = "Error Connecting to the Server. The server might has been closed."
                print("Error Connecting to the Server: {msg}".format(msg=e))
                break

            self.kivy_app.label.text = "Receiving Reply from the Server"
            received_data, status = self.recv()
            if status == 0:
                self.kivy_app.label.text = "Nothing Received from the Server"
                break
            else:
                self.kivy_app.label.text = "New Message from the Server"

            subject = received_data["subject"]
            if subject == "model":
                GANN_instance = received_data["data"]
            elif subject == "done":
                self.kivy_app.label.text = "Model is Trained"
                break
            else:
                self.kivy_app.label.text = "Unrecognized Message Type: {subject}".format(subject=subject)
                break

            ga_instance = prepare_GA(GANN_instance)

            ga_instance.run()

            subject = "model"
            best_sol_idx = ga_instance.best_solution()[2]

clientApp = ClientApp()
clientApp.title = "Client App"
clientApp.run()

At this time, the GUI apps of the server and the client are complete. The next section discusses building Android apps out of them.

Building an Android App

To build an Android app out of the Kivy app, there’s a project available for just this, called python-for-android. It takes the Kivy app as input and produces the APK as output. A tool that automates the production of the APK file is called Buildozer. Using just a single command like buildozer android debug, an Android APK is created.

Note that python-for-android and Buildozer only work in Linux. So you’ll have to use a Linux planform to continue with this section.

Here’s a summary of the steps required to build the Android app:

  • Create a virtual environment
  • Activate the virtual environment
  • Install Kivy
  • Install Buildozer
  • Create buildozer.spec file
  • Build the Android app

Please check out the tutorial titled Python for Android: Start Building Kivy Cross-Platform Applications for details about each of these steps.

For more information, check the book titled Building Android Apps in Python Using Kivy with Android Studio

Once a Python virtual environment is created in which Kivy and Buildozer are installed, please do the following:

  • Rename the main Python file of the application to main.py. Kivy uses this file as the main activity of the Android app.
  • Activate the virtual environment and create the buildozer.spec file using this command:

This command should be executed within the same directory in which the main.py file exists.

Inside the created buildozer.spec file, add numpy to the value assigned to the requirements field. Its value should be kivy,numpy. This will make the NumPy library available in the Android app.

  • Issue the following command to start building the Android app. This command takes quite a bit of time when issued for the first time.

Note that the process isn’t straightforward, and there might be some errors in the process. Try to locate the source of error and search for its solutions.

By following the steps, the APK will be exported, which can be installed in Android devices.

The links for the APK files of both the server and the client Android apps are given below:

The next figure shows how the client app looks when installed in an Android device.

Conclusion

In the 4th part of the federated learning demo project, we created a GUIs for both the server and the client using Kivy. We then build a simple Android app out of the client Kivy app, which allows a machine learning model to be trained using federated learning based on the data available on an Android device.

Fritz

Our team has been at the forefront of Artificial Intelligence and Machine Learning research for more than 15 years and we're using our collective intelligence to help others learn, understand and grow using these new technologies in ethical and sustainable ways.

Comments 0 Responses

Leave a Reply

Your email address will not be published. Required fields are marked *

wix banner square