I want to threads has some local variable, with thread.Thread
it can be done like this elegantly:
class TTT(threading.Thread):
def __init__(self, lines, ip, port):
threading.Thread.__init__(self)
self._lines = lines;
self._sock = initsock(ip, port)
self._sts = 0
self._cts = 0
def run(self):
for line in self._lines:
query = genquery(line)
length = len(query)
head = "0xFFFFFFFE"
q = struct.pack('II%ds'%len(query), head, length, query)
sock.send(q)
sock.recv(4)
length, = struct.unpack('I', sock.recv(4))
result = ''
remain = length
while remain:
t = sock.recv(remain)
result+=t
remain-=len(t)
print(result)
As you can see that _lines
_sock
_sts
_cts
these variable will be independent in every thread.
But with concurrent.future.ThreadPoolExecutor
, it seems that it's not that easy. With ThreadPoolExecutor
, how can I make things elegantly?(no more global variables)
New Edited
class Processor(object):
def __init__(self, host, port):
self._sock = self._init_sock(host, port)
def __call__(self, address, adcode):
self._send_data(address, adcode)
result = self._recv_data()
return json.loads(result)
def main():
args = parse_args()
adcode = {"shenzhen": 440300}[args.city]
if args.output:
fo = open(args.output, "w", encoding="utf-8")
else:
fo = sys.stdout
with open(args.file, encoding=args.encoding) as fi, fo,\
ThreadPoolExecutor(max_workers=args.processes) as executor:
reader = csv.DictReader(fi)
writer = csv.DictWriter(fo, reader.fieldnames + ["crfterm"])
test_set = AddressIter(args.file, args.field, args.encoding)
func = Processor(args.host, args.port)
futures = map(lambda x: executor.submit(func, x, adcode), test_set)
for row, future in zip(reader, as_completed(futures)):
result = future.result()
row["crfterm"] = join_segs_tags(result["segs"], result["tags"])
writer.writerow(row)
Using a layout very similar to what you have now would be the easiest thing. Instead of a Thread
, have a normal object, and instead of run
, implement your logic in __call__
:
class TTT:
def __init__(self, lines, ip, port):
self._lines = lines;
self._sock = initsock(ip, port)
self._sts = 0
self._cts = 0
def __call__(self):
...
# do stuff to self
Adding a __call__
method to a class makes it possible to invoke instances as if they were regular functions. In fact, normal functions are objects with such a method. You can now pass a bunch of TTT
instances to either map
or submit
.
Alternatively, you could absorb the initialization into your task function:
def ttt(lines, ip, port):
sock = initsock(ip, port)
sts = cts = 0
...
Now you can call submit
with the correct parameter list or map
with an iterable of values for each parameter.
I would prefer the former approach for this example because it opens the port outside the executor. Error reporting in executor tasks can be tricky sometimes, and I would prefer to make the error prone operation of opening a port as transparent as possible.
EDIT
Based on your related question, I believe that the real question you are asking is about function-local variables (which are automatically thread-local as well), not being shared between function calls on the same thread. However, you can always pass references between function calls.
User contributions licensed under CC BY-SA 3.0