Erlang Map/Reduce Job in Riak using a Ruby Client

February 10, 2013 Leave a comment

Note: I am horrible at Erlang, but have figured out enough to construct a Map/Reduce job. Hopefully these notes serve more than a warning.

It’s rarely possible to know every way you will want to access your data. Riak has secondary indices (2i) but if you don’t have one that represents what you want to query, it can be time consuming to populate one of these when you have a lot of documents. Ad-hoc queries are rarely where a database will shine, but when you have a one-off job, sometimes Map/Reduce is the only option you have.

In the example code below, I’m using the Riak Ruby Client to talk to a Riak ring. I’m getting my client from Ripple because it is already setup by the time this method is called. Create a client in whatever way makes sense for your application. Look below the code example for some line-specific notes.

Erlang Notes

  • lines 17, 38 – Define the language of the map and reduce phases, respectively. Yes, this means you can use a different language in the map phase than you do in the reduce phase (e.g. map: erlang, reduce: javascript).
  • lines 19-35, 40-42 – This is the real meat of the processing puzzle. When defining Erlang functions here, you have to use anonymous functions which start with fun.
  • line 19 – Defining these functions requires some knowledge of what’s to be passed in, so let’s look at what the arguments are
    • Obj – Riak object/document as retrieved from the bucket.
    • _KeyData – Information about the document’s key.
    • _Arg – Static argument passed by the caller into the map phase.
  • lines 20-21 – Turn the Riak document into a structure. We store our data as json so we use mochijson:decode to transform the data.
  • lines 23-28 – Define a function to help with picking the data that matches the supplied argument(s).
  • lines 30-34 – Use the supplied argument to filter the data we want. Return an empty array if the data doesn’t match our query.
  • line 40 – Again with the function definitions and arguments.
    • Values – Result from the map phase.
    • _Arg – Static argument passed by the caller into the reduce phase.
  • line 41 – Artificially limit the results by a supplied argument.

General Processing Notes

  • line 49 – Using a list of keys will help performance greatly. See below for suggestions on obtaining a list of keys.
  • line 52 – To help with VM memory, and to imitate so semblance of transactionality, I run things in batches.
  • lines 54-56 – This defines the job iteslf. The map phase or the reduce phase can be ommitted but not both. The keep argument determines whether the documents from this phase are included in the final output. I tend to use keep=true only on the last phase run (i.e. map phase if no reduce; reduce phase otherwise)
  • line 58 – Add each bucket/key pair to the job. The key can be excluded specifying just the bucket but this will cause a full bucket scan. I’ve had problems with blowing out the VM memory when not specifying the keys and iterating a large bucket.

Obtaining Document Keys

I get a list of keys independent of my map reduce job to take some of the overhead out of the MR job itself. I’ve used a couple of different methods to get a list of keys to process.

  1. Query keys from an index. If you have an index you can lean on, it will cut down lots of overhead to query that index for keys to work through.
  2. Stream key list from the bucket. This can take a while and add significant overhead to the ring, but is useful if you need all keys in the bucket. When streaming keys, the multipart response produces what looks like multiple documents. The pattern, {"keys": [..]}, will show up multiple times and require some massaging before it can be read by a JSON reader.

Given my ignorance of Erlang and Riak, I’m sure there are better ways to accomplish these same steps. I’ve performed the same work using JavaScript with like results, but JavaScript is handled outside of the Erlang VM and doesn’t tend to be as fast as Erlang. For M/R jobs that you will be running more than once, it is suggested to compile the Erlang functions and putting the libraries on the servers in the ring.

At this point, I’m just parroting what I’ve heard and can only cause confusion saying more. Hopefully this is more helpful than confusing. Be sure to check #riak on Freenode for some really great people and help.

Categories: development Tags: , , ,

Find activate-immediately services that lack an activate method

July 30, 2012 1 comment

Immediate service activation in OSGi can be tricky but there are some basic rules to consider. Another point to think about is when a service doesn’t contain an activate method. The code base I work in uses Felix’s SCR annotations which makes this search pretty concise. I also assume that the code is in git. If your code isn’t, you should be able to replace git with find <dir> -type f -exec <grep-fu here>.

Find activate-immediately services

Using a little git and grep-fu, we can find our services that activate immediately.

git grep -l 'immediate[[:space:]]\?=[[:space:]]\?true'

The command below makes a few assumptions:

  1. We use SCR annotations that allow immediate = (true|false)
  2. We use immediate = true almost never except in the @Component annotation.

Find services that have an activate method

For our next trick, we’ll find the set of files that have an activate method as noted by the use of an @Activate annotation.

git grep -l '@Activate'

Find the intersection of activate-immediately services that lack an activate method

Now to the fun. With the information we’ve grepped above, we need the things that are in set 1 but not in set 2. There’s a great little tool in linux called comm that can help us with the set operations. It needs sorted data to work correctly, so we’ll dress up our previous commands and send them in.

comm -23 <(git grep -l 'immediate[[:space:]]\?=[[:space:]]\?true' | sort) <(git grep -l '@Activate' | sort)

The -23 argument tells comm that we want to suppress unique items from the second set (activate method list) and to suppress items that exist in both sets (immediate + activate). This leaves us with services that don’t have an activate method. If you want to see services that are immediate and have an activate method, change the argument to -12.

Categories: development, osgi

Understanding the ‘unresolved constraint’, ‘missing requirement’ message from Apache Felix Pt. 2

July 11, 2012 5 comments

We previously took a look at Felix’s unresolved constraint message. I started testing with Felix 4.0.2 today and realized the output for an unresolved constraint has changed a bit.

ERROR: Bundle org.sakaiproject.nakamura.world [76] Error starting file:bundles/org.sakaiproject.nakamura.world_1.4.0.SNAPSHOT.jar (org.osgi.framework.BundleException: Unresolved constraint in bundle org.sakaiproject.nakamura.world [76]: Unable to resolve 76.0: missing requirement [76.0] osgi.wiring.package; (&(osgi.wiring.package=javax.servlet)(version>=3.0.0)))
11.07.2012 17:01:43.297 *ERROR* [FelixDispatchQueue] org.sakaiproject.nakamura.world FrameworkEvent ERROR (org.osgi.framework.BundleException: Unresolved constraint in bundle org.sakaiproject.nakamura.world [76]: Unable to resolve 76.0: missing requirement [76.0] osgi.wiring.package; (&(osgi.wiring.package=javax.servlet)(version>=3.0.0))) org.osgi.framework.BundleException: Unresolved constraint in bundle org.sakaiproject.nakamura.world [76]: Unable to resolve 76.0: missing requirement [76.0] osgi.wiring.package; (&(osgi.wiring.package=javax.servlet)(version>=3.0.0))
 at org.apache.felix.framework.Felix.resolveBundleRevision(Felix.java:3826)
 at org.apache.felix.framework.Felix.startBundle(Felix.java:1868)
 at org.apache.felix.framework.Felix.setActiveStartLevel(Felix.java:1191)
 at org.apache.felix.framework.FrameworkStartLevelImpl.run(FrameworkStartLevelImpl.java:295)
 at java.lang.Thread.run(Thread.java:679)

We have the same unresolved constraint..missing requirement as before but the part we’re interested in has changed a bit. Let’s break apart that first message.

(org.osgi.framework.BundleException: Unresolved constraint in bundle org.sakaiproject.nakamura.world [76]: Unable to resolve 76.0: missing requirement [76.0] osgi.wiring.package; (&(osgi.wiring.package=javax.servlet)(version>=3.0.0)))

Given what we know from last time, the interesting bits above are:

Unresolved constraint in bundle org.sakaiproject.nakamura.world [76]

This tells you what bundle had an issue trying to resolve a constraint. The next part is a bit less obvious but can be broken up.

osgi.wiring.package; (&(osgi.wiring.package=javax.servlet)(version>=3.0.0)))

osgi.wiring.package looks pretty foreign, hu? Disregard that and you see javax.servlet and version>=3.0.0. Tada! That’s the good stuff. So, figure out where that bundle is that exports javax.servlet>=3.0.0 and you’re on your way. (Hint: maybe, javax.servlet:javax.servlet-api:3.0.0 or org.ops4j.pax.web:pax-web-jetty-bundle:2.0.1 is what you’re looking for.)

Categories: osgi

Getting started with Pax Runner

After fighting through a Maven assembly for a small project, I just couldn’t take that headache again. I’ve used Apache Sling’s Maven Launchpad Plugin to put together a standalone OSGi server but Launchpad doesn’t allow you to pick which OSGi container you deploy to or what version of Felix gets used. I’ve started working with Pax Runner because a) it looks pretty nifty, b) those Pax folks are doing great stuff for the OSGi deployers out there.

Pax Runner has a few sweet features I’m really digging right now.

Different OSGi platforms and versions

I generally develop on Apache Felix, but if I should be able to run in any other OSGi container, right? Well, to test that theory I can ask Pax Runner to load up my profile using a different platform (--platform; defaults to ‘felix‘) and the version of that platform (--version).

If I want to test my setup with Equinox I just run:

pax-run --platform=equinox awesome-profile.composite

Knopflerfish you say?

pax-run --platform=knopflerfish awesome-profile.composite

What about an older version of Felix? Easy!

pax-run --platform=felix --version=3.0.8 awesome-profile.composite

Deploy multiple profiles and build composite profiles

I plan to run my project with a single profile, but if you find the need to include other profiles it’s just another command line switch:

pax-run --profiles=config,log awesome-profile.composite

“But I want a specific version of a profile.” And you should! So use this:

pax-run --profiles=config/1.0.0,log/1.2.0 awesome-profile.composite

Profiles? What’s this crazy talk you speak?!

So, the root of all this chatter is Pax Runner Profiles. I can barely speak to the topic, but will attempt to anyway. (Don’t trust me; read the documentation)

A short glimpse into my profile file shows that I just include bundles that I know to live in a Maven repository:

scan-bundle:mvn:commons-io/commons-io/1.4@1
scan-bundle:mvn:commons-fileupload/commons-fileupload/1.2.2@1
scan-bundle:mvn:commons-collections/commons-collections/3.2.1@1
scan-bundle:mvn:commons-lang/commons-lang/2.6@1
scan-bundle:mvn:commons-pool/commons-pool/1.5.6@1
scan-bundle:mvn:commons-codec/commons-codec/1.5@1

A quick explanation of these lines is simply:
scan-bundle:mvn:<groupId>/<artifactId>/<version>@<startLevel>

For those looking for more OSGi goodness, Pax Web has really stepped up with the 2.0.0 release. You can configure your Jetty server by deploying a bundle fragment with the appropriate jetty.xml file. Awesome!

Categories: osgi

Understanding the ‘unresolved constraint’, ‘missing requirement’ message from Apache Felix

January 19, 2012 4 comments

It’s pretty common while developing an OSGi bundle that your imports and exports won’t quite match what you need or what exists in the server you’re deploying to. This can show up as NoClassDefFoundError, ClassNotFoundException or as log output in a stacktrace from bundle resolution. Hall, Pauls, McCullough and Savage did a great job of covering NCDFE and CNFE in “OSGi In Action” (chapter 8), let’s take a look at figuring out what the bundle resolution stacktrace is telling us. (I make nothing from the sales of “OSGi In Action” and suggest it to anyone interested in OSGi.)

Just like learning to read the stacktrace from an exception in Java is key to debugging, so is true about the dependency resolution messages from an OSGi container. Below is the output from Apache Felix when it encountered a missing dependency required by a bundle:

ERROR: Bundle org.sakaiproject.nakamura.webconsole.solr [124]: Error starting slinginstall:org.sakaiproject.nakamura.webconsole.solr-1.2-SNAPSHOT.jar (org.osgi.framework.BundleException: Unresolved constraint in bundle org.sakaiproject.nakamura.webconsole.solr [124]: Unable to resolve 124.0: missing requirement [124.0] package; (package=org.apache.solr.client.solrj) [caused by: Unable to resolve 84.0: missing requirement [84.0] package; (package=org.sakaiproject.nakamura.api.lite) [caused by: Unable to resolve 86.0: missing requirement [86.0] package; (&(package=com.google.common.collect)(version>=9.0.0)(!(version>=10.0.0)))]])
org.osgi.framework.BundleException: Unresolved constraint in bundle org.sakaiproject.nakamura.webconsole.solr [124]: Unable to resolve 124.0: missing requirement [124.0] package; (package=org.apache.solr.client.solrj) [caused by: Unable to resolve 84.0: missing requirement [84.0] package; (package=org.sakaiproject.nakamura.api.lite) [caused by: Unable to resolve 86.0: missing requirement [86.0] package; (&(package=com.google.common.collect)(version>=9.0.0)(!(version>=10.0.0)))]]
    at org.apache.felix.framework.Felix.resolveBundle(Felix.java:3443)
    at org.apache.felix.framework.Felix.startBundle(Felix.java:1727)
    at org.apache.felix.framework.Felix.setActiveStartLevel(Felix.java:1156)
    at org.apache.felix.framework.StartLevelImpl.run(StartLevelImpl.java:264)
    at java.lang.Thread.run(Thread.java:619)

What you have here is a stacktrace with a lengthy message. The important part of the stacktrace for us is the message.

ERROR: Bundle org.sakaiproject.nakamura.webconsole.solr [124]: Error starting slinginstall:org.sakaiproject.nakamura.webconsole.solr-1.2-SNAPSHOT.jar (org.osgi.framework.BundleException: Unresolved constraint in bundle org.sakaiproject.nakamura.webconsole.solr [124]: Unable to resolve 124.0: missing requirement [124.0] package; (package=org.apache.solr.client.solrj) [caused by: Unable to resolve 84.0: missing requirement [84.0] package; (package=org.sakaiproject.nakamura.api.lite) [caused by: Unable to resolve 86.0: missing requirement [86.0] package; (&(package=com.google.common.collect)(version>=9.0.0)(!(version>=10.0.0)))]])

This message is pretty simple but the structure is common for nastier messages (i.e. deeper resolution paths before failure). Let’s pull it apart to see what’s happening in there.

ERROR: Bundle org.sakaiproject.nakamura.webconsole.solr [124]: Error starting slinginstall:org.sakaiproject.nakamura.webconsole.solr-1.2-SNAPSHOT.jar

This very first part tells us that an error occurred while trying to load the org.sakaiproject.nakamura.webconsole.solrbundle. Nice start, but not quite the crux of the matter. Let’s keep reading.

org.osgi.framework.BundleException: Unresolved constraint in bundle org.sakaiproject.nakamura.webconsole.solr [124]: Unable to resolve 124.0: missing requirement [124.0] package; (package=org.apache.solr.client.solrj) [caused by: Unable to resolve 84.0: missing requirement [84.0] package; (package=org.sakaiproject.nakamura.api.lite) [caused by: Unable to resolve 86.0: missing requirement [86.0] package; (&(package=com.google.common.collect)(version>=9.0.0)(!(version>=10.0.0)))]])

Phew, that’s a lot of text! This is the heart of what we need though, so let’s break it down to make more sense of it.

(
    org.osgi.framework.BundleException: Unresolved constraint in bundle org.sakaiproject.nakamura.webconsole.solr [124]: Unable to resolve 124.0: missing requirement [124.0] package; (package=org.apache.solr.client.solrj)
        [
            caused by: Unable to resolve 84.0: missing requirement [84.0] package; (package=org.sakaiproject.nakamura.api.lite)
            [
                 caused by: Unable to resolve 86.0: missing requirement [86.0] package; (&(package=com.google.common.collect)(version>=9.0.0)(!(version>=10.0.0)))
            ]
        ]
)

What are those [number]s in the message?

The numbers in the message tell us the bundle ID on the server.

Unresolved Package Name Bundle ID Where Resolution Failed
org.apache.solr.client.solrj 124
org.sakaiproject.nakamura.api.lite 84
com.google.common.collect 86

Once you pull apart the message it becomes more obvious that it has structure and meaning! The structure of the message tells us that bundle 124 depends on a package from bundle 84 which depends on a package from bundle 86 which is unable to resolve com.google.common.collect;version=[9.0.0, 10.0.0). The innermost/very last message tells us the root of the problem; the dependency resolver was unable to find com.google.common.collect at version=[9.0.0, 10.0.0). Now we have somewhere to start digging.

How To Fix This

I suggest one of the following steps:

  1. Add a bundle that exports the missing package with a version that matches the required version
  2. Change the version to match an exported package already on the server

In this particular environment, com.google.common.collect;version=10.0.0 is what our server has deployed. The descriptor above specifically blocks any version not in the 9.x.x range. We generate the OSGi manifest by using the Maven Bundle Plugin which uses the BND tool to generate the manifest. In BND version > 2.1.0, the macro for versions was changed. Our solution has ranged from rolling back to bnd version=2.1.0 OR define the macro differently. The results are the same; the version segment in the manifest header becomes com.google.common.collect;version>=9.0.0 which finds our bundle of com.google.common.collect;version=10.0.0.


Notes about environment

The above message and stacktrace originated from a Sakai OAE environment which is built on Apache Sling and thusly Apache Felix. We use an artifact ID that is the root package of the bundle (org.sakaiproject.nakamura.webconsole.solr). This has the side effect that our bundle names look like package names in the message but gives a very clear naming convention.

Using pygraphviz to plot OSGi bundle dependencies

September 5, 2011 Leave a comment

I’ve been working on an OSGi project for the last few years. As with any project, evolutionary changes will eventually require some cleanup. As new bundles have been added over time, the graph of dependencies is starting to get unwieldy in places. Even with good management of these dependencies, a nice visual layout of things can really help you see how your bundles are interconnected and give you the power to start separating some connections if you graph starts to hit cyclical dependencies.

I drove around a few different visual tools in Eclipse (PDE visualization tool, m2eclipse dependency graph) but needed something that would narrow the view to just my project. I not only wanted to see what things my bundles depend on in the project but I wanted to see what depends on a given bundle.

This was my first project with pygraphviz but after I figured out which way the grain runs, I was able to give it a basic graph of my project and generate the diagrams I wanted. I finally let pygraphviz handle the graph traversing and things got better (less code, faster results).

Since I wanted to analyze the runtime dependencies of my server, I installed the Felix Remote Shell bundle along with the Felix Shell bundle to allow remote connections to management functionality. With these in place, I was able to connect via telnet and query for package level information to build my graph.

Using the code below, I was able to generate a graph of the entire project (successors) and a graph for each bundle in the graph (predecessors and successors). Some sample images are below the source code. Eventually I’ll add saving and opening of dot files to allow for analysis without a running server.

#! /usr/bin/env python
import re
from sets import Set
import os
import pygraphviz as pgv
import sys
import telnetlib

# [   1] [Active     ] [   15] org.sakaiproject.nakamura.messaging (0.11.0.SNAPSHOT)
bundle_from_ps = re.compile('^\[\s*(?P<bundle_id>\d+)\]\s\[.+\]\s(?P<bundle_name>.+)\s')

# org.sakaiproject.nakamura.api.solr; version=0.0.0 -> org.sakaiproject.nakamura.solr [11]
bundle_from_req = re.compile('^.*-\> (?P<bundle_name>.*) \[(?P<bundle_id>.*)\]$')

def get_sakai_bundles():
    """Get a list of bundles that are create as part of Sakai OAE.
    Returns a dictionary of dict[bundle_name] = bundle_id.
    """
    tn = telnetlib.Telnet('localhost', '6666')
    tn.write('ps -s\nexit\n')
    lines = [line for line in tn.read_all().split('\r\n') if 'org.sakaiproject' in line]

    bundles = {}
    for line in lines:
        m = bundle_from_ps.match(line)
        bundles[m.group('bundle_name')] = m.group('bundle_id')
    return bundles

def get_package_reqs(bundle_id):
    """Gets the requirements (imports) for a given bundle.
    Returns a dictionary of dict[bundle_name] = bundle_id.

    Keyword arguments:
    bundle_id -- Bundle ID returned by the server in the output of
                 get_sakai_bundles()
    """
    tn = telnetlib.Telnet('localhost', '6666')
    tn.write('inspect package requirement %s\nexit\n' % (bundle_id))
    lines = [line for line in tn.read_all().split('\r\n') if line.startswith('org.sakaiproject') and line.endswith(']')]

    reqs = {}
    for line in lines:
        m = bundle_from_req.match(line)
        reqs[m.group('bundle_name')] = m.group('bundle_id')
    return reqs

def build_bundle_graph():
    """Build a graph_attr (nodes, edges) representing the connectivity
    within Sakai bundles
    """
    sakai_bundles = get_sakai_bundles()
    bundles = {}
    for b_name, b_id in sakai_bundles.items():
        reqs = get_package_reqs(b_id)
        bundles[b_name] = reqs.keys()
    return bundles

def draw_subgraph(name, graph, filename, successors = True):
    """Draw (write to disk) a subgraph that starts with or ends with
    the specified node.

    Keyword arguments:
    name -- name of the node to focus on
    graph -- graph of paths between bundles
    filename -- filename to write subgraph to
    successors -- whether to lookup successors or predecessors
    """
    if successors:
        nbunch = graph.successors(name)
    else:
        nbunch = graph.predecessors(name)

    if nbunch:
        nbunch.append(name)
        subgraph = graph.subgraph(nbunch)
        subgraph.layout(prog = 'dot')
        subgraph.draw('graphviz/%s' % filename)

def main():
    if not os.path.isdir('graphviz'):
        os.mkdir('graphviz')

    bundles_graph = build_bundle_graph()

    # print the whole graph
    graph = pgv.AGraph(data = bundles_graph, directed = True)
    graph.layout(prog = 'dot')
    graph.draw('graphviz/org.sakaiproject.nakamura.png')

    for b_name, reqs in bundles_graph.items():
        draw_subgraph(b_name, graph, '%s.png' % b_name)
        draw_subgraph(b_name, graph, '%s-pred.png' % b_name, False)

if __name__ == '__main__':
    main()
Sakai OAE Bundle Dependency Graph

Sakai OAE Bundle Dependency Graph

Sakai OAE Solr Bundle Predecessors

Sakai OAE Solr Bundle Predecessors

Sakai OAE Presence Bundle Successors

Sakai OAE Presence Bundle Successors

Categories: development, osgi

Committer Status on Apache Sling

February 16, 2011 5 comments

I woke up this morning to the fantastic news that I’ve been offered committer status on Apache Sling! I graciously and excitedly accepted the opportunity to become an Apache committer.
The various points of processing are in motion, so expect to see more Sling posts from me as I get ramped up into being a more productive member of Apache Sling.

Categories: development
Follow

Get every new post delivered to your Inbox.

Join 271 other followers

%d bloggers like this: