How can I use threadlocal variable with ThreadPoolExecutor?

1

I want to threads has some local variable, with thread.Thread it can be done like this elegantly:

class TTT(threading.Thread):
    def __init__(self, lines, ip, port):
        threading.Thread.__init__(self)
        self._lines = lines;
        self._sock = initsock(ip, port)
        self._sts = 0
        self._cts = 0

    def run(self):
        for line in self._lines:
            query = genquery(line)
            length = len(query)
            head = "0xFFFFFFFE"
            q = struct.pack('II%ds'%len(query),  head,  length, query)
            sock.send(q)
            sock.recv(4)
            length,  = struct.unpack('I',  sock.recv(4))
            result = ''
            remain = length
            while remain:
                t = sock.recv(remain)
                result+=t
                remain-=len(t)
            print(result)

As you can see that _lines _sock _sts _cts these variable will be independent in every thread.

But with concurrent.future.ThreadPoolExecutor, it seems that it's not that easy. With ThreadPoolExecutor, how can I make things elegantly?(no more global variables)


New Edited

class Processor(object):
    def __init__(self, host, port):
        self._sock = self._init_sock(host, port)

    def __call__(self, address, adcode):
        self._send_data(address, adcode)
        result = self._recv_data()
        return json.loads(result)

def main():
    args = parse_args()
    adcode = {"shenzhen": 440300}[args.city]

    if args.output:
        fo = open(args.output, "w", encoding="utf-8")
    else:
        fo = sys.stdout
    with open(args.file, encoding=args.encoding) as fi, fo,\
        ThreadPoolExecutor(max_workers=args.processes) as executor:
        reader = csv.DictReader(fi)
        writer = csv.DictWriter(fo, reader.fieldnames + ["crfterm"])
        test_set = AddressIter(args.file, args.field, args.encoding)
        func = Processor(args.host, args.port)
        futures = map(lambda x: executor.submit(func, x, adcode), test_set)
        for row, future in zip(reader, as_completed(futures)):
            result = future.result()
            row["crfterm"] = join_segs_tags(result["segs"], result["tags"])
            writer.writerow(row)
python
multithreading
threadpoolexecutor
asked on Stack Overflow Aug 26, 2017 by roger • edited Aug 26, 2017 by roger

1 Answer

1

Using a layout very similar to what you have now would be the easiest thing. Instead of a Thread, have a normal object, and instead of run, implement your logic in __call__:

class TTT:
    def __init__(self, lines, ip, port):
        self._lines = lines;
        self._sock = initsock(ip, port)
        self._sts = 0
        self._cts = 0

    def __call__(self):
        ...
        # do stuff to self

Adding a __call__ method to a class makes it possible to invoke instances as if they were regular functions. In fact, normal functions are objects with such a method. You can now pass a bunch of TTT instances to either map or submit.

Alternatively, you could absorb the initialization into your task function:

def ttt(lines, ip, port):
    sock = initsock(ip, port)
    sts = cts = 0
    ...

Now you can call submit with the correct parameter list or map with an iterable of values for each parameter.

I would prefer the former approach for this example because it opens the port outside the executor. Error reporting in executor tasks can be tricky sometimes, and I would prefer to make the error prone operation of opening a port as transparent as possible.

EDIT

Based on your related question, I believe that the real question you are asking is about function-local variables (which are automatically thread-local as well), not being shared between function calls on the same thread. However, you can always pass references between function calls.

answered on Stack Overflow Aug 26, 2017 by Mad Physicist • edited Aug 28, 2017 by Mad Physicist

User contributions licensed under CC BY-SA 3.0