Check if an IP address is in a IPNetwork with Pyspark
Check if an IP address is in a IPNetwork with Pyspark
With Pyspark, I would like to join/merge if an IP address in the dataframe A is in a IP network range or hits the same IP address in the dataframe B.
The dataframe A contains IP addresses only and the other one has IP addresses or IP addresses with a CIDR. Here is an example.
Dataframe A
+---------------+
| ip_address|
+---------------+
| 192.0.2.2|
| 164.42.155.5|
| 52.95.245.0|
| 66.42.224.235|
| ...|
+---------------+
Dataframe B
+---------------+
| ip_address|
+---------------+
| 123.122.213.34|
| 41.32.241.2|
| 66.42.224.235|
| 192.0.2.0/23|
| ...|
+---------------+
then an expected output is something like below
+---------------+--------+
| ip_address| is_in_b|
+---------------+--------+
| 192.0.2.2| true| -> This is in the same network range as 192.0.2.0/23
| 164.42.155.5| false|
| 52.95.245.0| false|
| 66.42.224.235| true| -> This is in B
| ...| ...|
+---------------+--------+
The idea I first wanted to try is using a udf comparing one by one and checking an IP range when a CIDR comes up but it seems udfs don't multiple dataframes. I also tried to convert the df B to a list and then compare. However, it is very inefficient and takes a long time as the A row number*the B row number is over 100 million. Is there any efficient solution?
Edited:
For more detailed information, I used the following code to check without pyspark and using any library.
def cidr_to_netmask(c):
cidr = int(c)
mask = (0xffffffff >> (32 - cidr)) << (32 - cidr)
return (str((0xff000000 & mask) >> 24) + '.' + str((0x00ff0000 & mask) >> 16) + '.' + str((0x0000ff00 & mask) >> 8) + '.' + str((0x000000ff & mask)))
def ip_to_numeric(ip):
ip_num = 0
for i, octet in enumerate(ip.split('.')):
ip_num += int(octet) << (24 - (8 * i))
return ip_num
def is_in_ip_network(ip, network_addr):
if len(network_addr.split('/')) < 2:
return ip == network_addr.split('/')[0]
else:
network_ip, cidr = network_addr.split('/')
subnet = cidr_to_netmask(cidr)
return (ip_to_numeric(ip) & ip_to_numeric(subnet)) == (ip_to_numeric(network_ip) & ip_to_numeric(subnet))
52.95.245.0
66.42.224.235
false
@pault I just wanted to show an output example so I modified the sample dataframes. For a network range, gist.github.com/tott/7684443 is a good example even in PHP. In Python, I usually use the
netaddr
library to do the same thing like IPAddress(x) in IPNetwork(y)
. netaddr.readthedocs.io/en/latest/tutorial_01.html– andrewshih
Jun 28 at 16:14
netaddr
IPAddress(x) in IPNetwork(y)
1 Answer
1
You could use crossJoin
and udf
s, but with a cost of cartesian product
crossJoin
udf
from pyspark.sql import *
data_1 = ["192.0.2.2", "164.42.155.5", "52.95.245.0", "66.42.224.235"]
data_2 = ["192.0.2.0/23", "66.42.224.235"]
DF1 = spark.createDataFrame([Row(ip=x) for x in data_1])
DF2 = spark.createDataFrame([Row(ip=x) for x in data_2])
from pyspark.sql.functions import udf
from pyspark.sql.types import *
join_cond = udf(is_in_ip_network, BooleanType())
DF1.crossJoin(DF2).withColumn("match",join_cond(DF1.ip, DF2.ip))
The result looks similar to
ip ip match
192.0.2.2 192.0.2.0/23 true
192.0.2.2 66.42.224.235 false
164.42.155.5 192.0.2.0/23 false
164.42.155.5 66.42.224.235 false
52.95.245.0 192.0.2.0/23 false
52.95.245.0 66.42.224.235 false
66.42.224.235 192.0.2.0/23 false
66.42.224.235 66.42.224.235 true
Thanks it worked well with dropping duplicates after the join. However, it still take too long as DF1 has around 10,000,000 rows and DF2 is around 10,000. Is there any way to optimise? DF1 has some duplicates IP addresses (All IPs in the DF2 are unique and I need to keep all the duplicates in the DF1.) so I wonder if I can use something like a memoisation technique with pyspark.
– andrewshih
2 days ago
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
I'm not familiar with "is in IP network range" logic- can you elaborate on that with an example? Also why are
52.95.245.0
and66.42.224.235
showing asfalse
in the output? Those are clearly in B. Am I missing something?– pault
Jun 28 at 14:43