With Pyspark, I would like to join/merge if an IP address in the dataframe A is in a IP network range or hits the same IP address in the dataframe B.
The dataframe A contains IP addresses only and the other one has IP addresses or IP addresses with a CIDR. Here is an example.
Dataframe A
+---------------+
| ip_address|
+---------------+
| 192.0.2.2|
| 164.42.155.5|
| 52.95.245.0|
| 66.42.224.235|
| ...|
+---------------+
Dataframe B
+---------------+
| ip_address|
+---------------+
| 123.122.213.34|
| 41.32.241.2|
| 66.42.224.235|
| 192.0.2.0/23|
| ...|
+---------------+
then an expected output is something like below
+---------------+--------+
| ip_address| is_in_b|
+---------------+--------+
| 192.0.2.2| true| -> This is in the same network range as 192.0.2.0/23
| 164.42.155.5| false|
| 52.95.245.0| false|
| 66.42.224.235| true| -> This is in B
| ...| ...|
+---------------+--------+
The idea I first wanted to try is using a udf comparing one by one and checking an IP range when a CIDR comes up but it seems udfs don't multiple dataframes. I also tried to convert the df B to a list and then compare. However, it is very inefficient and takes a long time as the A row number*the B row number is over 100 million. Is there any efficient solution?
Edited: For more detailed information, I used the following code to check without pyspark and using any library.
def cidr_to_netmask(c):
cidr = int(c)
mask = (0xffffffff >> (32 - cidr)) << (32 - cidr)
return (str((0xff000000 & mask) >> 24) + '.' + str((0x00ff0000 & mask) >> 16) + '.' + str((0x0000ff00 & mask) >> 8) + '.' + str((0x000000ff & mask)))
def ip_to_numeric(ip):
ip_num = 0
for i, octet in enumerate(ip.split('.')):
ip_num += int(octet) << (24 - (8 * i))
return ip_num
def is_in_ip_network(ip, network_addr):
if len(network_addr.split('/')) < 2:
return ip == network_addr.split('/')[0]
else:
network_ip, cidr = network_addr.split('/')
subnet = cidr_to_netmask(cidr)
return (ip_to_numeric(ip) & ip_to_numeric(subnet)) == (ip_to_numeric(network_ip) & ip_to_numeric(subnet))
You could use crossJoin
and udf
s, but with a cost of cartesian product
from pyspark.sql import *
data_1 = ["192.0.2.2", "164.42.155.5", "52.95.245.0", "66.42.224.235"]
data_2 = ["192.0.2.0/23", "66.42.224.235"]
DF1 = spark.createDataFrame([Row(ip=x) for x in data_1])
DF2 = spark.createDataFrame([Row(ip=x) for x in data_2])
from pyspark.sql.functions import udf
from pyspark.sql.types import *
join_cond = udf(is_in_ip_network, BooleanType())
DF1.crossJoin(DF2).withColumn("match",join_cond(DF1.ip, DF2.ip))
The result looks similar to
ip ip match
192.0.2.2 192.0.2.0/23 true
192.0.2.2 66.42.224.235 false
164.42.155.5 192.0.2.0/23 false
164.42.155.5 66.42.224.235 false
52.95.245.0 192.0.2.0/23 false
52.95.245.0 66.42.224.235 false
66.42.224.235 192.0.2.0/23 false
66.42.224.235 66.42.224.235 true
User contributions licensed under CC BY-SA 3.0