forked from micropython/micropython-lib
- Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_pipe.py
24 lines (18 loc) · 567 Bytes
/
test_pipe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
importsys
importos
frommultiprocessingimportProcess, Pipe
deff(conn):
conn.send([42, None, "hello"])
conn.send([42, 42, 42])
conn.close()
if__name__=="__main__":
parent_conn, child_conn=Pipe(False)
# print(parent_conn, child_conn)
p=Process(target=f, args=(child_conn,))
# Extension: need to call this for uPy
ifsys.implementation.name=="micropython":
p.register_pipe(parent_conn, child_conn)
p.start()
parent_conn.recv() == [42, None, "hello"]
parent_conn.recv() == [42, 42, 42]
p.join()