How can I use threadlocal variable with ThreadPoolExecutor?


I want to threads has some local variable, with thread.Thread it can be done like this elegantly:

class TTT(threading.Thread):
    def __init__(self, lines, ip, port):
        self._lines = lines;
        self._sock = initsock(ip, port)
        self._sts = 0
        self._cts = 0

    def run(self):
        for line in self._lines:
            query = genquery(line)
            length = len(query)
            head = "0xFFFFFFFE"
            q = struct.pack('II%ds'%len(query),  head,  length, query)
            length,  = struct.unpack('I',  sock.recv(4))
            result = ''
            remain = length
            while remain:
                t = sock.recv(remain)

As you can see that _lines _sock _sts _cts these variable will be independent in every thread.

But with concurrent.future.ThreadPoolExecutor, it seems that it's not that easy. With ThreadPoolExecutor, how can I make things elegantly?(no more global variables)

New Edited

class Processor(object):
    def __init__(self, host, port):
        self._sock = self._init_sock(host, port)

    def __call__(self, address, adcode):
        self._send_data(address, adcode)
        result = self._recv_data()
        return json.loads(result)

def main():
    args = parse_args()
    adcode = {"shenzhen": 440300}[]

    if args.output:
        fo = open(args.output, "w", encoding="utf-8")
        fo = sys.stdout
    with open(args.file, encoding=args.encoding) as fi, fo,\
        ThreadPoolExecutor(max_workers=args.processes) as executor:
        reader = csv.DictReader(fi)
        writer = csv.DictWriter(fo, reader.fieldnames + ["crfterm"])
        test_set = AddressIter(args.file, args.field, args.encoding)
        func = Processor(, args.port)
        futures = map(lambda x: executor.submit(func, x, adcode), test_set)
        for row, future in zip(reader, as_completed(futures)):
            result = future.result()
            row["crfterm"] = join_segs_tags(result["segs"], result["tags"])
Using a layout very similar to what you have now would be the easiest thing. Instead of a Thread, have a normal object, and instead of run, implement your logic in __call__:

class TTT:
    def __init__(self, lines, ip, port):
        self._lines = lines;
        self._sock = initsock(ip, port)
        self._sts = 0
        self._cts = 0

    def __call__(self):
        # do stuff to self

Adding a __call__ method to a class makes it possible to invoke instances as if they were regular functions. In fact, normal functions are objects with such a method. You can now pass a bunch of TTT instances to either map or submit.

Alternatively, you could absorb the initialization into your task function:

def ttt(lines, ip, port):
    sock = initsock(ip, port)
    sts = cts = 0

Now you can call submit with the correct parameter list or map with an iterable of values for each parameter.

I would prefer the former approach for this example because it opens the port outside the executor. Error reporting in executor tasks can be tricky sometimes, and I would prefer to make the error prone operation of opening a port as transparent as possible.


Based on your related question, I believe that the real question you are asking is about function-local variables (which are automatically thread-local as well), not being shared between function calls on the same thread. However, you can always pass references between function calls.

