Problem using ontology in multi-process environment

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem using ontology in multi-process environment

fabad
Hi, first of all, thanks for owlready2, its very useful for ontology managing in python!

I had a functional python script that deals with ontologies to make some calculations, and I wanted to improve its performance by parallelizing it. I am using the ProcessPoolExecutor to manage the processes. However, my application hanged and do not output anything. After some research and debugging, I figured out that there were a number of errors thrown by each thread, but they were silenced in the main application.

It seems that python is trying to serialize/deserialize the ontology object via pickle in order to send a copy of the ontology to each process, but it fails with the following exception:

"""
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: 'NoneType' object is not callable
"""
I've searched on the internet and, I found that when the object you want to pass as argument to the processes is not picklelable, an PicklingError is thrown; however, it seems that here the ontology is picklelable, but it fails in the process.

Next, I show a minimal code to reproduce this error. Please, if you run this code, replace the "ontology_file_path" variable with the path of an ontology in your file system.

I was using owlready2 0.39, but I updated to 0.43 and the error is still there.

import pathlib
from concurrent.futures import ProcessPoolExecutor
from owlready2 import get_ontology
import time

def get_iri(ontology, job_id):
    return f"job {job_id} -> {ontology.iri}"

if __name__ == '__main__':
    ontology_file_path = pathlib.Path("/home/fabad/test_embed_comp/go.owl")
    print("Loading ontology")
    ontology = get_ontology(f"file://{str(ontology_file_path)}").load()
    print("Ontology loaded")
    executor = ProcessPoolExecutor(max_workers=4)
    results = {}
    for i in range(30):
        results[i] = executor.submit(get_iri, ontology, i)

    time.sleep(10)
    for n, future in results.items():
        if future.exception() is not None:
            raise future.exception()

    executor.shutdown(wait=True)

    for n, future in results.items():
        print(f'{future.result()}')


Anyone has faced this issue before? Do you have any clue on how to solve it?

Thanks beforehand,
Francisco Abad.
Reply | Threaded
Open this post in threaded view
|

Re: Problem using ontology in multi-process environment

Jiba
Administrator
Hi,

Ontology (as well as most Owlready object) cannot be pickled, because they depend on an SQLite3 connection which is not pickleable.

I think you have two options:

1) Modify your program so as you do not need to pass ontology as an argument. Typically, each process should have its own World and Ontology, and then you can pass their IRI as argument. This can be done either by loading the ontology in each process, or by using fork() after loading the ontology, so as each process has a copy of the ontology (but fork() is known to be problematic on Mac).

2) Use GEvent parallelization. This is simpler to set up (and well supported by recent version of Owlready). However, it does not permit "true" parallelization. If your bottleneck is a calculation in Python, there will be no gain. If it is a calculation performed externally (e.g. a call to a remote database or a network download), the gain will be similar to classical multiprocess.

Jiba
Reply | Threaded
Open this post in threaded view
|

Re: Problem using ontology in multi-process environment

fabad
Hi Jiba, thanks for your answer! I did not know how owlready2 works internally.

Finally, as I am working in a shared memory environment, I was able to open the ontology in the main thread and to use the memory address as parameter to each parallel thread. Then, each thread reads the memory address and make a cast to the ontology object. something like the following (not tested as I dont have a python compiler right now):

import pathlib
from concurrent.futures import ProcessPoolExecutor
from owlready2 import get_ontology
import time
import ctypes

def get_iri(ontology_address, job_id):
    ontology = ctypes.cast(ontology_address).value
    return f"job {job_id} -> {ontology.iri}"

if __name__ == '__main__':
    ontology_file_path = pathlib.Path("/home/fabad/test_embed_comp/go.owl")
    print("Loading ontology")
    ontology = get_ontology(f"file://{str(ontology_file_path)}").load()
        ontology_address = id(ontology)
    print("Ontology loaded")
    executor = ProcessPoolExecutor(max_workers=4)
    results = {}
    for i in range(30):
        results[i] = executor.submit(get_iri, ontology_address, i)

    executor.shutdown(wait=True)

    for n, future in results.items():
        print(f'{future.result()}')

Reply | Threaded
Open this post in threaded view
|

Re: Problem using ontology in multi-process environment

Jiba
Administrator
Hi,

I am very surprised that the ctypes cast can work! Normally, each process has its own memory, thus, one process cannot access the memory of another one (it should produce a SEGFAULT).

I tried to run your code, but I get an error:

Traceback (most recent call last):
  File "/usr/lib/python3.11/concurrent/futures/process.py", line 256, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jiba/src/./test7.py", line 33, in get_iri
    ontology = ctypes.cast(ontology_address).value
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: cast() missing 1 required positional argument: 'typ'


I then added the typ argument:

    ontology = ctypes.cast(ontology_address, ctypes.py_object).value

And it seems to work...

After some experiment, I understood that ProcessPoolExecutor uses fork() (at least under Linux, which I suppose you are using?). Fork duplicate the memory and thus the ontology. It works well if you only read the ontology; if you modify it, each process has actually a different ontology and thus the change done in a process will not be available to the other ones.

You can also use a global variable to transfer the ontology (it is possibly less risky than the use of ctypes: the memory address might not remain the same after forking?).


ONTO = None

def get_iri(job_id):
    ontology = ONTO
    print(ontology,  multiprocessing.current_process().pid)
    return f"job {job_id} -> {ontology.base_iri}"

if __name__ == '__main__':
    ontology_file_path = pathlib.Path("/tmp/t.owl")
    print("Loading ontology")
    ontology = get_ontology(f"file://{str(ontology_file_path)}").load()
    ONTO = ontology
   
    print("Ontology loaded")
    executor = ProcessPoolExecutor(max_workers=4)
    results = {}
    for i in range(5):
      results[i] = executor.submit(get_iri, i)

    executor.shutdown(wait = True)

    for n, future in results.items():
        print(f'{future.result()}')



jiba
Reply | Threaded
Open this post in threaded view
|

Re: Problem using ontology in multi-process environment

fabad
Thanks for your answer, Jiba, and sorry for the error in my code. I wrote my previous comment by heart as I did not have my code in front of me...

I though the same as you, each process would have their own memory space, but I tried and surprisingly it worked. Actually I am not sure if the fork is copying the memory space or if that space is actually shared between the processes.

To be honest, I think the memory space is being shared between the processes because the memory usage is not too high. In previous attempts, I passed the ontology file path to each process, and each process open it, which resulted in a high memory consumption. In fact, I could not execute more than 5 processes because my computer run out of memory as each process was opening the ontology. Now, using the memory address of the ontology, the memory usage remains more or less stable with independence of the number of processes I throw.

By the way, I am using linux, ubuntu concretely. I can do some experiments to check what happens if a process modify the ontology.
Reply | Threaded
Open this post in threaded view
|

Re: Problem using ontology in multi-process environment

Jiba
Administrator
If I remember well, the memory should be considered as copied. In practice, for performance purpose, it is shared unless the memory is modified (in that case, it is duplicated just before the first modification occurs).

Jiba
Reply | Threaded
Open this post in threaded view
|

Re: Problem using ontology in multi-process environment

fabad
Hi again and sorry for the delay. I've been testing the code and the memory is not being shared. In the following code, one process is creating a new class, and a second one is trying to access it after several seconds. The second process cannot see the new class created by the first thread:

import pathlib
import ctypes
import types
from concurrent.futures import ProcessPoolExecutor

from owlready2 import get_ontology, Thing
import time



def get_iri(ontology_address, job_id):
    ontology = ctypes.cast(ontology_address, ctypes.py_object).value
    base_iri = ontology.base_iri
    new_class_name = "NewClass"
    new_class_iri = base_iri + new_class_name
    if job_id == 0:
        print(f"Thread {job_id} creating new class.")
        with ontology:
            NewClass = types.new_class(new_class_name, (Thing,))
        if NewClass in list(ontology.search(iri=new_class_iri)):
            print(f"Thread {job_id} has created {new_class_iri}")

    if job_id == 1:
        seconds_to_wait = 10
        print(f"Thread {job_id} waiting {seconds_to_wait} seconds")
        time.sleep(seconds_to_wait)
        if len(list(ontology.search(iri=new_class_iri))) > 0:
            print(f"Thread {job_id} can see {new_class_iri}")
        else:
            print(f"Thread {job_id} cannot see {new_class_iri}")
    return f"job {job_id} -> {ontology.name}"

if __name__ == '__main__':
    ontology_file_path = pathlib.Path("/home/fabad/test_embed_comp/go.owl")
    print("Loading ontology")
    ontology = get_ontology(f"file://{str(ontology_file_path)}").load()
    ontology_address = id(ontology)
    print("Ontology loaded")
    executor = ProcessPoolExecutor(max_workers=4)
    results = {}
    for i in range(2):
        results[i] = executor.submit(get_iri, ontology_address, i)

    executor.shutdown(wait=True)

    for n, future in results.items():
        print(f'{future.result()}')


Then, I cannot explain why the memory usage is low when I used this approach.
Reply | Threaded
Open this post in threaded view
|

Re: Problem using ontology in multi-process environment

Jiba
Administrator
Hi,

The problem is that the quadstore is stored in a SQLite3 database, by default in memory. When processes are created, each has a copy of the database. Thus, modifying one copy does not alter the other.

In order to share the quadstore between several processes, you need to put the quadstore in a database on disk, to activate flag allowing the database to be shared, and to save the quadstore  (=commit the database) after each modification.

This can be done by adding the following just after "if __name__ == '__main__': " :

    default_world.set_backend(filename = "/tmp/quadstore.sqlite3", exclusive = False)

(NB you may adapt the filename of the SQLite3 database as you want)

And then, after creating the new class, you need to save the quadstore as follows:

        with ontology:
            NewClass = types.new_class(new_class_name, (Thing,))
            default_world.save()


With these changes, your example works as expected on my computer.

Jiba