Re: Problem using ontology in multi-process environment

Posted by Jiba on
URL: http://owlready.306.s1.nabble.com/Problem-using-ontology-in-multi-process-environment-tp3198p3215.html

Hi,

I am very surprised that the ctypes cast can work! Normally, each process has its own memory, thus, one process cannot access the memory of another one (it should produce a SEGFAULT).

I tried to run your code, but I get an error:

Traceback (most recent call last):
  File "/usr/lib/python3.11/concurrent/futures/process.py", line 256, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jiba/src/./test7.py", line 33, in get_iri
    ontology = ctypes.cast(ontology_address).value
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: cast() missing 1 required positional argument: 'typ'


I then added the typ argument:

    ontology = ctypes.cast(ontology_address, ctypes.py_object).value

And it seems to work...

After some experiment, I understood that ProcessPoolExecutor uses fork() (at least under Linux, which I suppose you are using?). Fork duplicate the memory and thus the ontology. It works well if you only read the ontology; if you modify it, each process has actually a different ontology and thus the change done in a process will not be available to the other ones.

You can also use a global variable to transfer the ontology (it is possibly less risky than the use of ctypes: the memory address might not remain the same after forking?).


ONTO = None

def get_iri(job_id):
    ontology = ONTO
    print(ontology,  multiprocessing.current_process().pid)
    return f"job {job_id} -> {ontology.base_iri}"

if __name__ == '__main__':
    ontology_file_path = pathlib.Path("/tmp/t.owl")
    print("Loading ontology")
    ontology = get_ontology(f"file://{str(ontology_file_path)}").load()
    ONTO = ontology
   
    print("Ontology loaded")
    executor = ProcessPoolExecutor(max_workers=4)
    results = {}
    for i in range(5):
      results[i] = executor.submit(get_iri, i)

    executor.shutdown(wait = True)

    for n, future in results.items():
        print(f'{future.result()}')



jiba