Check if an IP address is in a IPNetwork with Pyspark


Check if an IP address is in a IPNetwork with Pyspark



With Pyspark, I would like to join/merge if an IP address in the dataframe A is in a IP network range or hits the same IP address in the dataframe B.



The dataframe A contains IP addresses only and the other one has IP addresses or IP addresses with a CIDR. Here is an example.


Dataframe A
+---------------+
| ip_address|
+---------------+
| 192.0.2.2|
| 164.42.155.5|
| 52.95.245.0|
| 66.42.224.235|
| ...|
+---------------+

Dataframe B
+---------------+
| ip_address|
+---------------+
| 123.122.213.34|
| 41.32.241.2|
| 66.42.224.235|
| 192.0.2.0/23|
| ...|
+---------------+



then an expected output is something like below


+---------------+--------+
| ip_address| is_in_b|
+---------------+--------+
| 192.0.2.2| true| -> This is in the same network range as 192.0.2.0/23
| 164.42.155.5| false|
| 52.95.245.0| false|
| 66.42.224.235| true| -> This is in B
| ...| ...|
+---------------+--------+



The idea I first wanted to try is using a udf comparing one by one and checking an IP range when a CIDR comes up but it seems udfs don't multiple dataframes. I also tried to convert the df B to a list and then compare. However, it is very inefficient and takes a long time as the A row number*the B row number is over 100 million. Is there any efficient solution?



Edited:
For more detailed information, I used the following code to check without pyspark and using any library.


def cidr_to_netmask(c):
cidr = int(c)
mask = (0xffffffff >> (32 - cidr)) << (32 - cidr)

return (str((0xff000000 & mask) >> 24) + '.' + str((0x00ff0000 & mask) >> 16) + '.' + str((0x0000ff00 & mask) >> 8) + '.' + str((0x000000ff & mask)))

def ip_to_numeric(ip):
ip_num = 0
for i, octet in enumerate(ip.split('.')):
ip_num += int(octet) << (24 - (8 * i))

return ip_num

def is_in_ip_network(ip, network_addr):
if len(network_addr.split('/')) < 2:
return ip == network_addr.split('/')[0]
else:
network_ip, cidr = network_addr.split('/')
subnet = cidr_to_netmask(cidr)
return (ip_to_numeric(ip) & ip_to_numeric(subnet)) == (ip_to_numeric(network_ip) & ip_to_numeric(subnet))





I'm not familiar with "is in IP network range" logic- can you elaborate on that with an example? Also why are 52.95.245.0 and 66.42.224.235 showing as false in the output? Those are clearly in B. Am I missing something?
– pault
Jun 28 at 14:43



52.95.245.0


66.42.224.235


false





@pault I just wanted to show an output example so I modified the sample dataframes. For a network range, gist.github.com/tott/7684443 is a good example even in PHP. In Python, I usually use the netaddr library to do the same thing like IPAddress(x) in IPNetwork(y). netaddr.readthedocs.io/en/latest/tutorial_01.html
– andrewshih
Jun 28 at 16:14


netaddr


IPAddress(x) in IPNetwork(y)




1 Answer
1



You could use crossJoin and udfs, but with a cost of cartesian product


crossJoin


udf


from pyspark.sql import *
data_1 = ["192.0.2.2", "164.42.155.5", "52.95.245.0", "66.42.224.235"]
data_2 = ["192.0.2.0/23", "66.42.224.235"]
DF1 = spark.createDataFrame([Row(ip=x) for x in data_1])
DF2 = spark.createDataFrame([Row(ip=x) for x in data_2])

from pyspark.sql.functions import udf
from pyspark.sql.types import *
join_cond = udf(is_in_ip_network, BooleanType())

DF1.crossJoin(DF2).withColumn("match",join_cond(DF1.ip, DF2.ip))



The result looks similar to


ip ip match
192.0.2.2 192.0.2.0/23 true
192.0.2.2 66.42.224.235 false
164.42.155.5 192.0.2.0/23 false
164.42.155.5 66.42.224.235 false
52.95.245.0 192.0.2.0/23 false
52.95.245.0 66.42.224.235 false
66.42.224.235 192.0.2.0/23 false
66.42.224.235 66.42.224.235 true





Thanks it worked well with dropping duplicates after the join. However, it still take too long as DF1 has around 10,000,000 rows and DF2 is around 10,000. Is there any way to optimise? DF1 has some duplicates IP addresses (All IPs in the DF2 are unique and I need to keep all the duplicates in the DF1.) so I wonder if I can use something like a memoisation technique with pyspark.
– andrewshih
2 days ago






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Comments

Popular posts from this blog

paramiko-expect timeout is happening after executing the command

Export result set on Dbeaver to CSV

Opening a url is failing in Swift