Serialize Guava's MinMaxPriorityQueue


Serialize Guava's MinMaxPriorityQueue



After a few days researching why my Flink application is not working properly I've came to the conclusion that the problem resides in a MinMaxPriorityQueue I am using.


MinMaxPriorityQueue



It seems that this structure is not serializable. I've tried several ways to serialize it:


env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer])
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[java.lang.Double]], classOf[ProtobufSerializer]);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);



all of them without luck.



However I've found this: Serializing Guava's ImmutableTable



Is there an equivalent to MinMaxPriorityQueue, or a way to serialize it?



Update



I've translated Tomasz into scala:


class MinMaxPriorityQueueSerializer extends Serializer[MinMaxPriorityQueue[Object]] {
private[this] val log = LoggerFactory.getLogger(this.getClass)
setImmutable(false)
setAcceptsNull(false)

val OPTIMIZE_POSITIVE = true

override def read(kryo: Kryo, input: Input, aClass: Class[MinMaxPriorityQueue[Object]]): MinMaxPriorityQueue[Object] = {
log.error("Kryo READ")
val comparator: Ordering[Object] = kryo.readClassAndObject(input).asInstanceOf[Ordering[Object]]
val size = input.readInt(OPTIMIZE_POSITIVE)

val queue: MinMaxPriorityQueue[Object] = MinMaxPriorityQueue.orderedBy(comparator)
.expectedSize(size)
.create()

(0 to size).foreach(_ => queue.offer(kryo.readClassAndObject(input)))

queue
}

override def write(kryo: Kryo, output: Output, queue: MinMaxPriorityQueue[Object]): Unit = {
log.error("Kryo WRITE")
kryo.writeClassAndObject(output, queue.comparator)

val declaredSize = queue.size
output.writeInt(declaredSize, OPTIMIZE_POSITIVE)

val actualSize = queue.toArray.foldLeft(0) {
case (z, q) =>
kryo.writeClassAndObject(output, q)
z + 1
}

Preconditions.checkState(
declaredSize == actualSize,
"Declared size (%s) different than actual size (%s)", declaredSize, actualSize)
}
}



And set kryo in flink to use that Serializer:


env.getConfig.addDefaultKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[MinMaxPriorityQueueSerializer])

env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[MinMaxPriorityQueueSerializer])



However it seems it gets never called, since I do not see anywhere in the logs the outputs of log.error("Kryo READ") and log.error("Kryo WRITE")


log.error("Kryo READ")


log.error("Kryo WRITE")



And the transformation still returns an empty MinMaxPriorityQueue, even I am updating it.



Update 2



I've implemented the SerializerTester, but I am getting a bufferUnderflow:


object Main {

def main(args: Array[String]) {

val tester = new MinMaxPriorityQueueSerializerTester()

val inQueue: MinMaxPriorityQueue[java.lang.Double] = MinMaxPriorityQueue.create()
inQueue.add(1.0)

val outputStream = new ByteArrayOutputStream()
tester.serialize(outputStream, inQueue)

val inputStream = new ByteArrayInputStream(outputStream.toByteArray())
val outQueue: MinMaxPriorityQueue[java.lang.Double] = tester.deserialize(inputStream);

System.out.println(inQueue);
System.out.println(outQueue);

}

class MinMaxPriorityQueueSerializerTester {
val kryo = new Kryo
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy)
registerMinMaxSerializer();
// allowForClassesWithoutNoArgConstructor(); // needed to serialize Ordering

def registerMinMaxSerializer() {
kryo.addDefaultSerializer(classOf[MinMaxPriorityQueue[java.lang.Double]], new MinMaxPriorityQueueSerializer());
}

def serialize(out: OutputStream, queue: MinMaxPriorityQueue[java.lang.Double]) {
// try (Output output = new Output(out)) {
val output = new Output(out)
kryo.writeClassAndObject(output, queue)
// kryo.writeObject(output, queue)
//}
output.flush
}

def deserialize(in: InputStream): MinMaxPriorityQueue[java.lang.Double] = {
//try (Input input = new Input(in)) {
val input = new Input(in)
//kryo.readObject(input, classOf[MinMaxPriorityQueue[java.lang.Double]])
kryo.readClassAndObject(input).asInstanceOf[MinMaxPriorityQueue[java.lang.Double]]
//p}
}
}





You can create an issue in Guava's Github repo and request that feature. There was issue #615, which mentioned MinMaxPriorityQueue, but it wasn't included in the scope of the ticket in the end.
– Xaerxess
Jun 28 at 16:20


MinMaxPriorityQueue





@Xaerxess, thanks, I've opened it : github.com/google/guava/issues/3192
– elbaulp
Jun 28 at 16:57





@elbaulp Could you post the stack trace of the bufferUnderflow?
– Tomasz Linkowski
Jun 29 at 10:26





nevermind, I changed to use this implementation of intervalHeap: github.com/allenbh/gkutil_java/blob/master/src/gkimfl/util/… as it works out of the box. Thank you for your help, I am accepting your answer.
– elbaulp
2 days ago





@elbaulp Thanks, even though my answer didn't really solve your problem. You might consider posting a separate answer mentioning how you solved it in case anyone else has a similar problem.
– Tomasz Linkowski
2 days ago




2 Answers
2



You can use a custom Kryo Serializer.


Serializer



Here is a sample one (in Java):


class MinMaxPriorityQueueSerializer extends Serializer<MinMaxPriorityQueue<Object>> {

private static final boolean OPTIMIZE_POSITIVE = true;

protected MinMaxPriorityQueueSerializer() {
setAcceptsNull(false);
setImmutable(false);
}

@Override
public void write(Kryo kryo, Output output, MinMaxPriorityQueue<Object> queue) {
kryo.writeClassAndObject(output, queue.comparator());

int declaredSize = queue.size();
output.writeInt(declaredSize, OPTIMIZE_POSITIVE);

int actualSize = 0;
for (Object element : queue) {
kryo.writeClassAndObject(output, element);
actualSize++;
}

Preconditions.checkState(
declaredSize == actualSize,
"Declared size (%s) different than actual size (%s)", declaredSize, actualSize
);
}

@Override
public MinMaxPriorityQueue<Object> read(Kryo kryo, Input input, Class<MinMaxPriorityQueue<Object>> type) {
@SuppressWarnings("unchecked")
Comparator<Object> comparator = (Comparator<Object>) kryo.readClassAndObject(input);
int size = input.readInt(OPTIMIZE_POSITIVE);

MinMaxPriorityQueue<Object> queue = MinMaxPriorityQueue.orderedBy(comparator)
.expectedSize(size)
.create();

for (int i = 0; i < size; ++i) {
queue.offer(kryo.readClassAndObject(input));
}
return queue;
}
}



Here is how you could use it:


class MinMaxPriorityQueueSerializerTester {

public static void main(String args) {
MinMaxPriorityQueueSerializerTester tester = new MinMaxPriorityQueueSerializerTester();

MinMaxPriorityQueue<Integer> inQueue = MinMaxPriorityQueue.<Integer>orderedBy(Comparator.reverseOrder())
.create(Arrays.asList(5, 2, 7, 2, 4));

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
tester.serialize(outputStream, inQueue);

ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
@SuppressWarnings("unchecked")
MinMaxPriorityQueue<Integer> outQueue = (MinMaxPriorityQueue<Integer>) tester.deserialize(inputStream);

System.out.println(inQueue);
System.out.println(outQueue);
}

private final Kryo kryo;

public MinMaxPriorityQueueSerializerTester() {
this.kryo = new Kryo();
registerMinMaxSerializer();
allowForClassesWithoutNoArgConstructor(); // needed to serialize Ordering
}

private void registerMinMaxSerializer() {
kryo.addDefaultSerializer(MinMaxPriorityQueue.class, new MinMaxPriorityQueueSerializer());
}

private void allowForClassesWithoutNoArgConstructor() {
((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
}


public void serialize(OutputStream out, MinMaxPriorityQueue<?> queue) {
try (Output output = new Output(out)) {
kryo.writeObject(output, queue);
}
}

public MinMaxPriorityQueue<?> deserialize(InputStream in) {
try (Input input = new Input(in)) {
return kryo.readObject(input, MinMaxPriorityQueue.class);
}
}
}





Thanks, I've try it, with no luck :-(
– elbaulp
Jun 29 at 8:46





@elbaulp I tested this code code in Java, and it worked. Have you tested the serialization in Scala (outside of Flink)? If not, please do (as I did in MinMaxPriorityQueueSerializerTester), and then we will know more.
– Tomasz Linkowski
Jun 29 at 8:57


MinMaxPriorityQueueSerializerTester





I am getting a buffer underflow, see update 2
– elbaulp
Jun 29 at 10:04




I finally give up and tried to use a different Data Structure and make it Serializable with java.io.Serializable.


java.io.Serializable



This Data Structure is an IntervalHeap implemented here, I just made it Serializable in my project.



All works correctly now.






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Comments

Popular posts from this blog

paramiko-expect timeout is happening after executing the command

Export result set on Dbeaver to CSV

Opening a url is failing in Swift